Permalink
Browse files

split get_results into get_result/result_status, add AsyncHubResult

  • Loading branch information...
1 parent 11c0dbe commit 7724ff799f6b5c5c016076855b586dcbbec86f8d @minrk minrk committed Feb 27, 2011
View
64 IPython/zmq/parallel/asyncresult.py
@@ -10,6 +10,8 @@
# Imports
#-----------------------------------------------------------------------------
+import time
+
from IPython.external.decorator import decorator
import error
@@ -189,6 +191,23 @@ def __getattr__(self, key):
raise AttributeError("%r object has no attribute %r"%(
self.__class__.__name__, key))
return self.__getitem__(key)
+
+ # asynchronous iterator:
+ def __iter__(self):
+ if self._single_result:
+ raise TypeError("AsyncResults with a single result are not iterable.")
+ try:
+ rlist = self.get(0)
+ except error.TimeoutError:
+ # wait for each result individually
+ for msg_id in self.msg_ids:
+ ar = AsyncResult(self._client, msg_id, self._fname)
+ yield ar.get()
+ else:
+ # already done
+ for r in rlist:
+ yield r
+
class AsyncMapResult(AsyncResult):
@@ -227,6 +246,49 @@ def __iter__(self):
# already done
for r in rlist:
yield r
+
+
+class AsyncHubResult(AsyncResult):
+ """Class to wrap pending results that must be requested from the Hub"""
+ def wait(self, timeout=-1):
+ """wait for result to complete."""
+ start = time.time()
+ if self._ready:
+ return
+ local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
+ local_ready = self._client.barrier(local_ids, timeout)
+ if local_ready:
+ remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
+ if not remote_ids:
+ self._ready = True
+ else:
+ rdict = self._client.result_status(remote_ids, status_only=False)
+ pending = rdict['pending']
+ while pending and time.time() < start+timeout:
+ rdict = self._client.result_status(remote_ids, status_only=False)
+ pending = rdict['pending']
+ if pending:
+ time.sleep(0.1)
+ if not pending:
+ self._ready = True
+ if self._ready:
+ try:
+ results = map(self._client.results.get, self.msg_ids)
+ self._result = results
+ if self._single_result:
+ r = results[0]
+ if isinstance(r, Exception):
+ raise r
+ else:
+ results = error.collect_exceptions(results, self._fname)
+ self._result = self._reconstruct_result(results)
+ except Exception, e:
+ self._exception = e
+ self._success = False
+ else:
+ self._success = True
+ finally:
+ self._metadata = map(self._client.metadata.get, self.msg_ids)
-__all__ = ['AsyncResult', 'AsyncMapResult']
+__all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
View
225 IPython/zmq/parallel/client.py
@@ -32,7 +32,7 @@
import error
import map as Map
import streamsession as ss
-from asyncresult import AsyncResult, AsyncMapResult
+from asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
from clusterdir import ClusterDir, ClusterDirError
from dependency import Dependency, depend, require, dependent
from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
@@ -485,6 +485,15 @@ def connect_socket(s, url):
# handlers and callbacks for incoming messages
#--------------------------------------------------------------------------
+ def _unwrap_exception(self, content):
+ """unwrap exception, and remap engineid to int."""
+ e = ss.unwrap_exception(content)
+ if e.engine_info:
+ e_uuid = e.engine_info['engineid']
+ eid = self._engines[e_uuid]
+ e.engine_info['engineid'] = eid
+ return e
+
def _register_engine(self, msg):
"""Register a new engine, and update our connection info."""
content = msg['content']
@@ -537,7 +546,7 @@ def _handle_execute_reply(self, msg):
print ("got unknown result: %s"%msg_id)
else:
self.outstanding.remove(msg_id)
- self.results[msg_id] = ss.unwrap_exception(msg['content'])
+ self.results[msg_id] = self._unwrap_exception(msg['content'])
def _handle_apply_reply(self, msg):
"""Save the reply to an apply_request into our results."""
@@ -569,12 +578,7 @@ def _handle_apply_reply(self, msg):
# TODO: handle resubmission
pass
else:
- e = ss.unwrap_exception(content)
- if e.engine_info:
- e_uuid = e.engine_info['engineid']
- eid = self._engines[e_uuid]
- e.engine_info['engineid'] = eid
- self.results[msg_id] = e
+ self.results[msg_id] = self._unwrap_exception(content)
def _flush_notifications(self):
"""Flush notifications of engine registrations waiting
@@ -641,7 +645,7 @@ def _flush_iopub(self, sock):
s = md[name] or ''
md[name] = s + content['data']
elif msg_type == 'pyerr':
- md.update({'pyerr' : ss.unwrap_exception(content)})
+ md.update({'pyerr' : self._unwrap_exception(content)})
else:
md.update({msg_type : content['data']})
@@ -685,13 +689,13 @@ def spin(self):
if self._iopub_socket:
self._flush_iopub(self._iopub_socket)
- def barrier(self, msg_ids=None, timeout=-1):
- """waits on one or more `msg_ids`, for up to `timeout` seconds.
+ def barrier(self, jobs=None, timeout=-1):
+ """waits on one or more `jobs`, for up to `timeout` seconds.
Parameters
----------
- msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
+ jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
ints are indices to self.history
strs are msg_ids
default: wait on all outstanding messages
@@ -706,19 +710,20 @@ def barrier(self, msg_ids=None, timeout=-1):
False : timeout reached, some msg_ids still outstanding
"""
tic = time.time()
- if msg_ids is None:
+ if jobs is None:
theids = self.outstanding
else:
- if isinstance(msg_ids, (int, str, AsyncResult)):
- msg_ids = [msg_ids]
+ if isinstance(jobs, (int, str, AsyncResult)):
+ jobs = [jobs]
theids = set()
- for msg_id in msg_ids:
- if isinstance(msg_id, int):
- msg_id = self.history[msg_id]
- elif isinstance(msg_id, AsyncResult):
- map(theids.add, msg_id.msg_ids)
+ for job in jobs:
+ if isinstance(job, int):
+ # index access
+ job = self.history[job]
+ elif isinstance(job, AsyncResult):
+ map(theids.add, job.msg_ids)
continue
- theids.add(msg_id)
+ theids.add(job)
if not theids.intersection(self.outstanding):
return True
self.spin()
@@ -747,18 +752,39 @@ def clear(self, targets=None, block=None):
if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
- error = ss.unwrap_exception(msg['content'])
+ error = self._unwrap_exception(msg['content'])
if error:
return error
@spinfirst
@defaultblock
- def abort(self, msg_ids = None, targets=None, block=None):
- """Abort the execution queues of target(s)."""
+ def abort(self, jobs=None, targets=None, block=None):
+ """Abort specific jobs from the execution queues of target(s).
+
+ This is a mechanism to prevent jobs that have already been submitted
+ from executing.
+
+ Parameters
+ ----------
+
+ jobs : msg_id, list of msg_ids, or AsyncResult
+ The jobs to be aborted
+
+
+ """
targets = self._build_targets(targets)[0]
- if isinstance(msg_ids, basestring):
- msg_ids = [msg_ids]
+ msg_ids = []
+ if isinstance(jobs, (basestring,AsyncResult)):
+ jobs = [jobs]
+ bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
+ if bad_ids:
+ raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
+ for j in jobs:
+ if isinstance(j, AsyncResult):
+ msg_ids.extend(j.msg_ids)
+ else:
+ msg_ids.append(j)
content = dict(msg_ids=msg_ids)
for t in targets:
self.session.send(self._control_socket, 'abort_request',
@@ -770,7 +796,7 @@ def abort(self, msg_ids = None, targets=None, block=None):
if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
- error = ss.unwrap_exception(msg['content'])
+ error = self._unwrap_exception(msg['content'])
if error:
return error
@@ -791,7 +817,7 @@ def shutdown(self, targets=None, restart=False, controller=False, block=None):
if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
- error = ss.unwrap_exception(msg['content'])
+ error = self._unwrap_exception(msg['content'])
if controller:
time.sleep(0.25)
@@ -800,7 +826,7 @@ def shutdown(self, targets=None, restart=False, controller=False, block=None):
if self.debug:
pprint(msg)
if msg['content']['status'] != 'ok':
- error = ss.unwrap_exception(msg['content'])
+ error = self._unwrap_exception(msg['content'])
if error:
raise error
@@ -827,8 +853,9 @@ def execute(self, code, targets='all', block=None):
whether or not to wait until done to return
default: self.block
"""
- result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
- return result
+ result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
+ if not block:
+ return result
def run(self, filename, targets='all', block=None):
"""Execute contents of `filename` on engine(s).
@@ -1134,6 +1161,8 @@ def view(self, targets=None, balanced=None):
targets = slice(None)
if isinstance(targets, int):
+ if targets < 0:
+ targets = self.ids[targets]
if targets not in self.ids:
raise IndexError("No such engine: %i"%targets)
return self._cache_view(targets, balanced)
@@ -1159,7 +1188,8 @@ def push(self, ns, targets='all', block=None):
if not isinstance(ns, dict):
raise TypeError("Must be a dict, not %s"%type(ns))
result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
- return result
+ if not block:
+ return result
@defaultblock
def pull(self, keys, targets='all', block=None):
@@ -1191,7 +1221,7 @@ def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
msg_ids.extend(r.msg_ids)
r = AsyncResult(self, msg_ids, fname='scatter')
if block:
- return r.get()
+ r.get()
else:
return r
@@ -1218,41 +1248,112 @@ def gather(self, key, dist='b', targets='all', block=None):
#--------------------------------------------------------------------------
@spinfirst
- def get_results(self, msg_ids, status_only=False):
- """Returns the result of the execute or task request with `msg_ids`.
+ @defaultblock
+ def get_result(self, indices_or_msg_ids=None, block=None):
+ """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
+
+ If the client already has the results, no request to the Hub will be made.
+
+ This is a convenient way to construct AsyncResult objects, which are wrappers
+ that include metadata about execution, and allow for awaiting results that
+ were not submitted by this Client.
+
+ It can also be a convenient way to retrieve the metadata associated with
+ blocking execution, since it always retrieves
+
+ Examples
+ --------
+ ::
+
+ In [10]: r = client.apply()
Parameters
----------
- msg_ids : list of ints or msg_ids
+ indices_or_msg_ids : integer history index, str msg_id, or list of either
+ The indices or msg_ids of indices to be retrieved
+
+ block : bool
+ Whether to wait for the result to be done
+
+ Returns
+ -------
+
+ AsyncResult
+ A single AsyncResult object will always be returned.
+
+ AsyncHubResult
+ A subclass of AsyncResult that retrieves results from the Hub
+
+ """
+ if indices_or_msg_ids is None:
+ indices_or_msg_ids = -1
+
+ if not isinstance(indices_or_msg_ids, (list,tuple)):
+ indices_or_msg_ids = [indices_or_msg_ids]
+
+ theids = []
+ for id in indices_or_msg_ids:
+ if isinstance(id, int):
+ id = self.history[id]
+ if not isinstance(id, str):
+ raise TypeError("indices must be str or int, not %r"%id)
+ theids.append(id)
+
+ local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
+ remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
+
+ if remote_ids:
+ ar = AsyncHubResult(self, msg_ids=theids)
+ else:
+ ar = AsyncResult(self, msg_ids=theids)
+
+ if block:
+ ar.wait()
+
+ return ar
+
+ @spinfirst
+ def result_status(self, msg_ids, status_only=True):
+ """Check on the status of the result(s) of the apply request with `msg_ids`.
+
+ If status_only is False, then the actual results will be retrieved, else
+ only the status of the results will be checked.
+
+ Parameters
+ ----------
+
+ msg_ids : list of msg_ids
if int:
Passed as index to self.history for convenience.
- status_only : bool (default: False)
+ status_only : bool (default: True)
if False:
- return the actual results
+ Retrieve the actual results of completed tasks.
Returns
-------
results : dict
There will always be the keys 'pending' and 'completed', which will
- be lists of msg_ids.
+ be lists of msg_ids that are incomplete or complete. If `status_only`
+ is False, then completed results will be keyed by their `msg_id`.
"""
- if not isinstance(msg_ids, (list,tuple)):
- msg_ids = [msg_ids]
+ if not isinstance(indices_or_msg_ids, (list,tuple)):
+ indices_or_msg_ids = [indices_or_msg_ids]
+
theids = []
- for msg_id in msg_ids:
+ for msg_id in indices_or_msg_ids:
if isinstance(msg_id, int):
msg_id = self.history[msg_id]
- if not isinstance(msg_id, str):
+ if not isinstance(msg_id, basestring):
raise TypeError("msg_ids must be str, not %r"%msg_id)
theids.append(msg_id)
completed = []
local_results = {}
# comment this block out to temporarily disable local shortcut:
- for msg_id in list(theids):
+ for msg_id in theids:
if msg_id in self.results:
completed.append(msg_id)
local_results[msg_id] = self.results[msg_id]
@@ -1267,7 +1368,7 @@ def get_results(self, msg_ids, status_only=False):
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
- raise ss.unwrap_exception(content)
+ raise self._unwrap_exception(content)
buffers = msg['buffers']
else:
content = dict(completed=[],pending=[])
@@ -1298,13 +1399,17 @@ def get_results(self, msg_ids, status_only=False):
if rcontent['status'] == 'ok':
res,buffers = ss.unserialize_object(buffers)
else:
- res = ss.unwrap_exception(rcontent)
+ print rcontent
+ res = self._unwrap_exception(rcontent)
failures.append(res)
self.results[msg_id] = res
content[msg_id] = res
- error.collect_exceptions(failures, "get_results")
+ if len(theids) == 1 and failures:
+ raise failures[0]
+
+ error.collect_exceptions(failures, "result_status")
return content
@spinfirst
@@ -1329,11 +1434,11 @@ def queue_status(self, targets='all', verbose=False):
content = msg['content']
status = content.pop('status')
if status != 'ok':
- raise ss.unwrap_exception(content)
+ raise self._unwrap_exception(content)
return ss.rekey(content)
@spinfirst
- def purge_results(self, msg_ids=[], targets=[]):
+ def purge_results(self, jobs=[], targets=[]):
"""Tell the controller to forget results.
Individual results can be purged by msg_id, or the entire
@@ -1342,26 +1447,40 @@ def purge_results(self, msg_ids=[], targets=[]):
Parameters
----------
- msg_ids : str or list of strs
+ jobs : str or list of strs or AsyncResult objects
the msg_ids whose results should be forgotten.
targets : int/str/list of ints/strs
The targets, by uuid or int_id, whose entire history is to be purged.
Use `targets='all'` to scrub everything from the controller's memory.
default : None
"""
- if not targets and not msg_ids:
- raise ValueError
+ if not targets and not jobs:
+ raise ValueError("Must specify at least one of `targets` and `jobs`")
if targets:
targets = self._build_targets(targets)[1]
+
+ # construct msg_ids from jobs
+ msg_ids = []
+ if isinstance(jobs, (basestring,AsyncResult)):
+ jobs = [jobs]
+ bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
+ if bad_ids:
+ raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
+ for j in jobs:
+ if isinstance(j, AsyncResult):
+ msg_ids.extend(j.msg_ids)
+ else:
+ msg_ids.append(j)
+
content = dict(targets=targets, msg_ids=msg_ids)
self.session.send(self._query_socket, "purge_request", content=content)
idents, msg = self.session.recv(self._query_socket, 0)
if self.debug:
pprint(msg)
content = msg['content']
if content['status'] != 'ok':
- raise ss.unwrap_exception(content)
+ raise self._unwrap_exception(content)
__all__ = [ 'Client',
View
2 IPython/zmq/parallel/remotefunction.py
@@ -128,7 +128,7 @@ def __call__(self, *sequences):
args = []
for seq in sequences:
part = self.mapObject.getPartition(seq, index, nparts)
- if not part:
+ if len(part) == 0:
continue
else:
args.append(part)
View
149 IPython/zmq/parallel/view.py
@@ -15,7 +15,7 @@
from IPython.external.decorator import decorator
from IPython.zmq.parallel.asyncresult import AsyncResult
from IPython.zmq.parallel.dependency import Dependency
-from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
+from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel, remote
#-----------------------------------------------------------------------------
# Decorators
@@ -91,6 +91,8 @@ def __init__(self, client=None, targets=None):
for name in self._default_names:
setattr(self, name, getattr(self, name, None))
+ assert not self.__class__ is View, "Don't use base View objects, use subclasses"
+
def __repr__(self):
strtargets = str(self._targets)
@@ -106,9 +108,17 @@ def targets(self):
def targets(self, value):
raise AttributeError("Cannot set View `targets` after construction!")
+ @property
+ def balanced(self):
+ return self._balanced
+
+ @balanced.setter
+ def balanced(self, value):
+ raise AttributeError("Cannot set View `balanced` after construction!")
+
def _defaults(self, *excludes):
"""return dict of our default attributes, excluding names given."""
- d = dict(balanced=self._balanced, targets=self.targets)
+ d = dict(balanced=self._balanced, targets=self._targets)
for name in self._default_names:
if name not in excludes:
d[name] = getattr(self, name)
@@ -182,22 +192,22 @@ def apply_sync(self, f, *args, **kwargs):
d = self._defaults('block', 'bound')
return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
- @sync_results
- @save_ids
- def apply_bound(self, f, *args, **kwargs):
- """calls f(*args, **kwargs) bound to engine namespace(s).
-
- if self.block is False:
- returns msg_id
- else:
- returns actual result of f(*args, **kwargs)
-
- This method has access to the targets' globals
-
- """
- d = self._defaults('bound')
- return self.client.apply(f, args, kwargs, bound=True, **d)
-
+ # @sync_results
+ # @save_ids
+ # def apply_bound(self, f, *args, **kwargs):
+ # """calls f(*args, **kwargs) bound to engine namespace(s).
+ #
+ # if self.block is False:
+ # returns msg_id
+ # else:
+ # returns actual result of f(*args, **kwargs)
+ #
+ # This method has access to the targets' namespace via globals()
+ #
+ # """
+ # d = self._defaults('bound')
+ # return self.client.apply(f, args, kwargs, bound=True, **d)
+ #
@sync_results
@save_ids
def apply_async_bound(self, f, *args, **kwargs):
@@ -206,7 +216,7 @@ def apply_async_bound(self, f, *args, **kwargs):
returns: msg_id
- This method has access to the targets' globals
+ This method has access to the targets' namespace via globals()
"""
d = self._defaults('block', 'bound')
@@ -219,35 +229,54 @@ def apply_sync_bound(self, f, *args, **kwargs):
returns: actual result of f(*args, **kwargs)
- This method has access to the targets' globals
+ This method has access to the targets' namespace via globals()
"""
d = self._defaults('block', 'bound')
return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
- def abort(self, msg_ids=None, block=None):
+ def abort(self, jobs=None, block=None):
"""Abort jobs on my engines.
Parameters
----------
- msg_ids : None, str, list of strs, optional
+ jobs : None, str, list of strs, optional
if None: abort all jobs.
else: abort specific msg_id(s).
"""
block = block if block is not None else self.block
- return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
+ return self.client.abort(jobs=jobs, targets=self._targets, block=block)
def queue_status(self, verbose=False):
"""Fetch the Queue status of my engines"""
- return self.client.queue_status(targets=self.targets, verbose=verbose)
+ return self.client.queue_status(targets=self._targets, verbose=verbose)
- def purge_results(self, msg_ids=[], targets=[]):
+ def purge_results(self, jobs=[], targets=[]):
"""Instruct the controller to forget specific results."""
if targets is None or targets == 'all':
- targets = self.targets
- return self.client.purge_results(msg_ids=msg_ids, targets=targets)
+ targets = self._targets
+ return self.client.purge_results(jobs=jobs, targets=targets)
+
+ @spin_after
+ def get_result(self, indices_or_msg_ids=None):
+ """return one or more results, specified by history index or msg_id.
+ See client.get_result for details.
+
+ """
+
+ if indices_or_msg_ids is None:
+ indices_or_msg_ids = -1
+ if isinstance(indices_or_msg_ids, int):
+ indices_or_msg_ids = self.history[indices_or_msg_ids]
+ elif isinstance(indices_or_msg_ids, (list,tuple,set)):
+ indices_or_msg_ids = list(indices_or_msg_ids)
+ for i,index in enumerate(indices_or_msg_ids):
+ if isinstance(index, int):
+ indices_or_msg_ids[i] = self.history[index]
+ return self.client.get_result(indices_or_msg_ids)
+
#-------------------------------------------------------------------
# Map
#-------------------------------------------------------------------
@@ -261,7 +290,7 @@ def map_async(self, f, *sequences, **kwargs):
This is equivalent to map(...block=False)
- See `map` for details.
+ See `self.map` for details.
"""
if 'block' in kwargs:
raise TypeError("map_async doesn't take a `block` keyword argument.")
@@ -273,25 +302,33 @@ def map_sync(self, f, *sequences, **kwargs):
This is equivalent to map(...block=True)
- See `map` for details.
+ See `self.map` for details.
"""
if 'block' in kwargs:
raise TypeError("map_sync doesn't take a `block` keyword argument.")
kwargs['block'] = True
return self.map(f,*sequences,**kwargs)
+ def imap(self, f, *sequences, **kwargs):
+ """Parallel version of `itertools.imap`.
+
+ See `self.map` for details.
+ """
+
+ return iter(self.map_async(f,*sequences, **kwargs))
+
#-------------------------------------------------------------------
# Decorators
#-------------------------------------------------------------------
def remote(self, bound=True, block=True):
"""Decorator for making a RemoteFunction"""
- return remote(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
+ return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
def parallel(self, dist='b', bound=True, block=None):
"""Decorator for making a ParallelFunction"""
block = self.block if block is None else block
- return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
+ return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
class DirectView(View):
@@ -320,7 +357,9 @@ def __init__(self, client=None, targets=None):
@spin_after
@save_ids
def map(self, f, *sequences, **kwargs):
- """Parallel version of builtin `map`, using this View's `targets`.
+ """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
+
+ Parallel version of builtin `map`, using this View's `targets`.
There will be one task per target, so work will be chunked
if the sequences are longer than `targets`.
@@ -337,7 +376,7 @@ def map(self, f, *sequences, **kwargs):
block : bool
whether to wait for the result or not [default self.block]
bound : bool
- whether to wait for the result or not [default self.bound]
+ whether to have access to the engines' namespaces [default self.bound]
Returns
-------
@@ -347,7 +386,8 @@ def map(self, f, *sequences, **kwargs):
An object like AsyncResult, but which reassembles the sequence of results
into a single list. AsyncMapResults can be iterated through before all
results are complete.
- else:
+ else:
+ list
the result of map(f,*sequences)
"""
@@ -359,18 +399,18 @@ def map(self, f, *sequences, **kwargs):
assert len(sequences) > 0, "must have some sequences to map onto!"
pf = ParallelFunction(self.client, f, block=block, bound=bound,
- targets=self.targets, balanced=False)
+ targets=self._targets, balanced=False)
return pf.map(*sequences)
@sync_results
@save_ids
def execute(self, code, block=True):
"""execute some code on my targets."""
- return self.client.execute(code, block=block, targets=self.targets)
+ return self.client.execute(code, block=block, targets=self._targets)
def update(self, ns):
"""update remote namespace with dict `ns`"""
- return self.client.push(ns, targets=self.targets, block=self.block)
+ return self.client.push(ns, targets=self._targets, block=self.block)
push = update
@@ -379,7 +419,7 @@ def get(self, key_s):
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
# block = block if block is not None else self.block
- return self.client.pull(key_s, block=True, targets=self.targets)
+ return self.client.pull(key_s, block=True, targets=self._targets)
@sync_results
@save_ids
@@ -388,14 +428,14 @@ def pull(self, key_s, block=True):
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
block = block if block is not None else self.block
- return self.client.pull(key_s, block=block, targets=self.targets)
+ return self.client.pull(key_s, block=block, targets=self._targets)
def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
"""
Partition a Python sequence and send the partitions to a set of engines.
"""
block = block if block is not None else self.block
- targets = targets if targets is not None else self.targets
+ targets = targets if targets is not None else self._targets
return self.client.scatter(key, seq, dist=dist, flatten=flatten,
targets=targets, block=block)
@@ -407,7 +447,7 @@ def gather(self, key, dist='b', targets=None, block=None):
Gather a partitioned sequence on a set of engines as a single local seq.
"""
block = block if block is not None else self.block
- targets = targets if targets is not None else self.targets
+ targets = targets if targets is not None else self._targets
return self.client.gather(key, dist=dist, targets=targets, block=block)
@@ -420,12 +460,12 @@ def __setitem__(self,key, value):
def clear(self, block=False):
"""Clear the remote namespaces on my engines."""
block = block if block is not None else self.block
- return self.client.clear(targets=self.targets, block=block)
+ return self.client.clear(targets=self._targets, block=block)
def kill(self, block=True):
"""Kill my engines."""
block = block if block is not None else self.block
- return self.client.kill(targets=self.targets, block=block)
+ return self.client.kill(targets=self._targets, block=block)
#----------------------------------------
# activate for %px,%autopx magics
@@ -504,9 +544,9 @@ def _validate_dependency(self, dep):
def set_flags(self, **kwargs):
"""set my attribute flags by keyword.
- A View is a wrapper for the Client's apply method, but
- with attributes that specify keyword arguments, those attributes
- can be set by keyword argument with this method.
+ A View is a wrapper for the Client's apply method, but with attributes
+ that specify keyword arguments, those attributes can be set by keyword
+ argument with this method.
Parameters
----------
@@ -543,10 +583,15 @@ def set_flags(self, **kwargs):
@spin_after
@save_ids
def map(self, f, *sequences, **kwargs):
- """Parallel version of builtin `map`, load-balanced by this View.
+ """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
+
+ Parallel version of builtin `map`, load-balanced by this View.
- Each element will be a separate task, and will be load-balanced. This
- lets individual elements be available for iteration as soon as they arrive.
+ `block`, `bound`, and `chunk_size` can be specified by keyword only.
+
+ Each `chunk_size` elements will be a separate task, and will be
+ load-balanced. This lets individual elements be available for iteration
+ as soon as they arrive.
Parameters
----------
@@ -558,7 +603,9 @@ def map(self, f, *sequences, **kwargs):
block : bool
whether to wait for the result or not [default self.block]
bound : bool
- whether to use the engine's namespace
+ whether to use the engine's namespace [default self.bound]
+ chunk_size : int
+ how many elements should be in each task [default 1]
Returns
-------
@@ -586,7 +633,7 @@ def map(self, f, *sequences, **kwargs):
assert len(sequences) > 0, "must have some sequences to map onto!"
pf = ParallelFunction(self.client, f, block=block, bound=bound,
- targets=self.targets, balanced=True,
+ targets=self._targets, balanced=True,
chunk_size=chunk_size)
return pf.map(*sequences)
View
184 docs/source/parallelz/parallel_multiengine.txt
@@ -59,13 +59,24 @@ of engine ids:
Here we see that there are four engines ready to do work for us.
+For direct execution, we will make use of a :class:`DirectView` object, which can be
+constructed via list-access to the client:
+
+.. sourcecode::
+
+ In [4]: dview = rc[:] # use all engines
+
+.. seealso::
+
+ For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
+
+
Quick and easy parallelism
==========================
In many cases, you simply want to apply a Python function to a sequence of
objects, but *in parallel*. The client interface provides a simple way
-of accomplishing this: using the builtin :func:`map` and the ``@remote``
-function decorator, or the client's :meth:`map` method.
+of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
Parallel map
------------
@@ -79,44 +90,67 @@ DirectView's :meth:`map` method:
.. sourcecode:: ipython
In [62]: serial_result = map(lambda x:x**10, range(32))
+
+ In [63]: dview.block = True
+
+ In [66]: parallel_result = dview.map(lambda x: x**10, range(32))
- In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
-
- In [67]: serial_result==parallel_result.get()
+ In [67]: serial_result==parallel_result
Out[67]: True
.. note::
The :class:`DirectView`'s version of :meth:`map` does
- not do any load balancing. For a load balanced version, use a
+ not do dynamic load balancing. For a load balanced version, use a
:class:`LoadBalancedView`, or a :class:`ParallelFunction` with
`balanced=True`.
.. seealso::
- :meth:`map` is implemented via :class:`.ParallelFunction`.
+ :meth:`map` is implemented via :class:`ParallelFunction`.
-Remote function decorator
--------------------------
+Remote function decorators
+--------------------------
Remote functions are just like normal functions, but when they are called,
they execute on one or more engines, rather than locally. IPython provides
-some decorators:
+two decorators:
.. sourcecode:: ipython
- In [10]: @rc.remote(block=True, targets=0)
- ....: def f(x):
- ....: return 10.0*x**4
- ....:
+ In [10]: @rc.remote(block=True, targets='all')
+ ...: def getpid():
+ ...: import os
+ ...: return os.getpid()
+ ...:
+
+ In [11]: getpid()
+ Out[11]: [12345, 12346, 12347, 12348]
- In [11]: map(f, range(32)) # this is done on engine 0
- Out[11]: [0.0,10.0,160.0,...]
+A ``@parallel`` decorator creates parallel functions, that break up an element-wise
+operations and distribute them, reconstructing the result.
+
+.. sourcecode:: ipython
+
+ In [12]: import numpy as np
+
+ In [13]: A = np.random.random((64,48))
+
+ In [14]: @rc.parallel(block=True, targets='all')
+ ...: def pmul(A,B):
+ ...: return A*B
+
+ In [15]: C_local = A*A
+
+ In [16]: C_remote_partial = pmul(A,A)
+
+ In [17]: (C_local == C_remote).all()
+ Out[17]: True
.. seealso::
- See the docstring for the :func:`parallel` and :func:`remote` decorators for
+ See the docstrings for the :func:`parallel` and :func:`remote` decorators for
options.
Calling Python functions
@@ -152,7 +186,7 @@ the extra arguments. For instance, performing index-access on a client creates a
Out[4]: <DirectView [1, 2]>
In [5]: view.apply<tab>
- view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound
+ view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound
A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
@@ -180,22 +214,22 @@ blocks until the engines are done executing the command:
.. sourcecode:: ipython
- In [2]: rc.block=True
- In [3]: dview = rc[:] # A DirectView of all engines
+ In [2]: dview = rc[:] # A DirectView of all engines
+ In [3]: dview.block=True
In [4]: dview['a'] = 5
In [5]: dview['b'] = 10
In [6]: dview.apply_bound(lambda x: a+b+x, 27)
Out[6]: [42, 42, 42, 42]
-Python commands can be executed on specific engines by calling execute using
-the ``targets`` keyword argument, or creating a :class:`DirectView` instance
-by index-access to the client:
+Python commands can be executed on specific engines by calling execute using the ``targets``
+keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
+index-access to the client:
.. sourcecode:: ipython
- In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2])
+ In [6]: rc.execute('c=a+b', targets=[0,2])
In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
@@ -214,21 +248,23 @@ by index-access to the client:
:meth:`View.apply(f,*args,**kwargs)`, which simply calls
``f(*args,**kwargs)`` remotely.
-This example also shows one of the most important things about the IPython
+Bound and unbound execution
+---------------------------
+
+The previous example also shows one of the most important things about the IPython
engines: they have a persistent user namespaces. The :meth:`apply` method can
-be run in either a bound or unbound way. The default for a View is to be
-unbound, unless called by the :meth:`apply_bound` method:
+be run in either a bound or unbound manner:
.. sourcecode:: ipython
In [9]: dview['b'] = 5 # assign b to 5 everywhere
In [10]: v0 = rc[0]
- In [12]: v0.apply_bound(lambda : b)
+ In [12]: v0.apply_sync_bound(lambda : b)
Out[12]: 5
- In [13]: v0.apply(lambda : b)
+ In [13]: v0.apply_sync(lambda : b)
---------------------------------------------------------------------------
RemoteError Traceback (most recent call last)
/home/you/<ipython-input-34-21a468eb10f0> in <module>()
@@ -244,10 +280,10 @@ unbound, unless called by the :meth:`apply_bound` method:
Specifically, `bound=True` specifies that the engine's namespace is to be used
-for execution, and `bound=False` specifies that the engine's namespace is not
-to be used (hence, 'b' is undefined during unbound execution, since the
-function is called in an empty namespace). Unbound execution is often useful
-for large numbers of atomic tasks, which prevents bloating the engine's
+as the `globals` when the function is called, and `bound=False` specifies that
+the engine's namespace is not to be used (hence, 'b' is undefined during unbound
+execution, since the function is called in an empty namespace). Unbound execution is
+often useful for large numbers of atomic tasks, which prevents bloating the engine's
memory, while bound execution lets you build on your previous work.
@@ -257,7 +293,7 @@ Non-blocking execution
In non-blocking mode, :meth:`apply` submits the command to be executed and
then returns a :class:`AsyncResult` object immediately. The
:class:`AsyncResult` object gives you a way of getting a result at a later
-time through its :meth:`get` method.
+time through its :meth:`get` method.
.. Note::
@@ -280,25 +316,25 @@ local Python/IPython session:
...: return time.time()-tic
# In non-blocking mode
- In [7]: pr = dview.apply_async(wait, 2)
+ In [7]: ar = dview.apply_async(wait, 2)
# Now block for the result
- In [8]: pr.get()
+ In [8]: ar.get()
Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
# Again in non-blocking mode
- In [9]: pr = dview.apply_async(wait, 10)
+ In [9]: ar = dview.apply_async(wait, 10)
# Poll to see if the result is ready
- In [10]: pr.ready()
+ In [10]: ar.ready()
Out[10]: False
# ask for the result, but wait a maximum of 1 second:
- In [45]: pr.get(1)
+ In [45]: ar.get(1)
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
/home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
- ----> 1 pr.get(1)
+ ----> 1 ar.get(1)
/path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
62 raise self._exception
@@ -316,8 +352,8 @@ local Python/IPython session:
Often, it is desirable to wait until a set of :class:`AsyncResult` objects
are done. For this, there is a the method :meth:`barrier`. This method takes a
-tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
-associated results are ready:
+tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
+and blocks until all of the associated results are ready:
.. sourcecode:: ipython
@@ -338,15 +374,17 @@ associated results are ready:
The ``block`` keyword argument and attributes
---------------------------------------------
-Most methods(like :meth:`apply`) accept
+Most client methods(like :meth:`apply`) accept
``block`` as a keyword argument. As we have seen above, these
-keyword arguments control the blocking mode . The :class:`Client` class also has
+keyword arguments control the blocking mode. The :class:`Client` class also has
a :attr:`block` attribute that controls the default behavior when the keyword
argument is not provided. Thus the following logic is used for :attr:`block`:
* If no keyword argument is provided, the instance attributes are used.
* Keyword argument, if provided override the instance attributes for
the duration of a single call.
+
+DirectView objects also have a ``bound`` attribute, which is used in the same way.
The following examples demonstrate how to use the instance attributes:
@@ -365,7 +403,7 @@ The following examples demonstrate how to use the instance attributes:
In [22]: rc.apply(lambda : 42, targets='all')
Out[22]: [42, 42, 42, 42]
-The :attr:`block` and :attr:`targets` instance attributes of the
+The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the
:class:`.DirectView` also determine the behavior of the parallel magic commands.
@@ -381,9 +419,9 @@ Parallel magic commands
We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
that make it more pleasant to execute Python commands on the engines
interactively. These are simply shortcuts to :meth:`execute` and
-:meth:`get_result`. The ``%px`` magic executes a single Python command on the
-engines specified by the :attr:`targets` attribute of the
-:class:`MultiEngineClient` instance (by default this is ``'all'``):
+:meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
+Python command on the engines specified by the :attr:`targets` attribute of the
+:class:`DirectView` instance:
.. sourcecode:: ipython
@@ -399,7 +437,6 @@ engines specified by the :attr:`targets` attribute of the
In [26]: %px import numpy
Parallel execution on engines: [0, 1, 2, 3]
- Out[26]:[None,None,None,None]
In [27]: %px a = numpy.random.rand(2,2)
Parallel execution on engines: [0, 1, 2, 3]
@@ -408,36 +445,25 @@ engines specified by the :attr:`targets` attribute of the
Parallel execution on engines: [0, 1, 2, 3]
In [28]: dv['ev']
- Out[44]: [ array([ 1.09522024, -0.09645227]),
+ Out[28]: [ array([ 1.09522024, -0.09645227]),
array([ 1.21435496, -0.35546712]),
array([ 0.72180653, 0.07133042]),
array([ 1.46384341e+00, 1.04353244e-04])
]
-.. Note::
-
- ``%result`` doesn't work
-
-The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
-command executed on each engine. It is simply a shortcut to the
+The ``%result`` magic gets the most recent result, or takes an argument
+specifying the index of the result to be requested. It is simply a shortcut to the
:meth:`get_result` method:
.. sourcecode:: ipython
-
- In [29]: %result
- Out[29]:
- <Results List>
- [0] In [10]: print numpy.linalg.eigvals(a)
- [0] Out[10]: [ 1.28167017 0.14197338]
-
- [1] In [9]: print numpy.linalg.eigvals(a)
- [1] Out[9]: [-0.14093616 1.27877273]
-
- [2] In [10]: print numpy.linalg.eigvals(a)
- [2] Out[10]: [-0.37023573 1.06779409]
-
- [3] In [9]: print numpy.linalg.eigvals(a)
- [3] Out[9]: [ 0.83664764 -0.25602658]
+
+ In [29]: dv.apply_async_bound(lambda : ev)
+
+ In [30]: %result
+ Out[30]: [ [ 1.28167017 0.14197338],
+ [-0.14093616 1.27877273],
+ [-0.37023573 1.06779409],
+ [ 0.83664764 -0.25602658] ]
The ``%autopx`` magic switches to a mode where everything you type is executed
on the engines given by the :attr:`targets` attribute:
@@ -477,12 +503,6 @@ on the engines given by the :attr:`targets` attribute:
'Average max eigenvalue is: 10.1158837784',]
-.. Note::
-
- Multiline ``%autpx`` gets fouled up by NameErrors, because IPython
- currently introspects too much.
-
-
Moving Python objects around
============================
@@ -524,14 +544,12 @@ In non-blocking mode :meth:`push` and :meth:`pull` also return
In [47]: rc.block=False
- In [48]: pr = rc.pull('a')
+ In [48]: ar = rc.pull('a')
- In [49]: pr.get()
+ In [49]: ar.get()
Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
-
-
Dictionary interface
--------------------
@@ -751,9 +769,9 @@ All of this same error handling magic even works in non-blocking mode:
In [83]: rc.block=False
- In [84]: pr = rc.execute('1/0')
+ In [84]: ar = rc.execute('1/0')
- In [85]: pr.get()
+ In [85]: ar.get()
---------------------------------------------------------------------------
CompositeError Traceback (most recent call last)
View
15 docs/source/parallelz/parallel_task.txt
@@ -33,16 +33,16 @@ Creating a ``Client`` instance
==============================
The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
-module and then create a :class:`.Client` instance:
+module and then create a :class:`.Client` instance, and we will also be using
+a :class:`LoadBalancedView`, here called `lview`:
.. sourcecode:: ipython
In [1]: from IPython.zmq.parallel import client
In [2]: rc = client.Client()
- In [3]: lview = rc.view(balanced=True)
- Out[3]: <LoadBalancedView None>
+ In [3]: lview = rc.view()
This form assumes that the controller was started on localhost with default
@@ -73,14 +73,15 @@ the task interface.
Parallel map
------------
-To load-balance :meth:`map`,simply use a LoadBalancedView, created by asking
-for the ``None`` element:
+To load-balance :meth:`map`,simply use a LoadBalancedView:
.. sourcecode:: ipython
-
+
+ In [62]: lview.block = True
+
In [63]: serial_result = map(lambda x:x**10, range(32))
- In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
+ In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
In [65]: serial_result==parallel_result
Out[65]: True

0 comments on commit 7724ff7

Please sign in to comment.