Permalink
Browse files

support iterating through map results as they arrive

  • Loading branch information...
minrk committed Feb 21, 2011
1 parent d865e85 commit 46d7c9dafca1f430834d908792876a0af624c996
Showing with 103 additions and 5 deletions.
  1. +24 −0 IPython/zmq/parallel/asyncresult.py
  2. +75 −3 IPython/zmq/parallel/client.py
  3. +4 −2 IPython/zmq/parallel/streamkernel.py
@@ -35,6 +35,8 @@ class AsyncResult(object):
def __init__(self, client, msg_ids, fname=''):
self._client = client
+ if isinstance(msg_ids, basestring):
+ msg_ids = [msg_ids]
self.msg_ids = msg_ids
self._fname=fname
self._ready = False
@@ -204,5 +206,27 @@ def _reconstruct_result(self, res):
"""Perform the gather on the actual results."""
return self._mapObject.joinPartitions(res)
+ # asynchronous iterator:
+ def __iter__(self):
+ try:
+ rlist = self.get(0)
+ except error.TimeoutError:
+ # wait for each result individually
+ for msg_id in self.msg_ids:
+ ar = AsyncResult(self._client, msg_id, self._fname)
+ rlist = ar.get()
+ try:
+ for r in rlist:
+ yield r
+ except TypeError:
+ # flattened, not a list
+ # this could get broken by flattened data that returns iterables
+ # but most calls to map do not expose the `flatten` argument
+ yield rlist
+ else:
+ # already done
+ for r in rlist:
+ yield r
+
__all__ = ['AsyncResult', 'AsyncMapResult']
@@ -1098,10 +1098,82 @@ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
# Map and decorators
#--------------------------------------------------------------------------
- def map(self, f, *sequences):
- """Parallel version of builtin `map`, using all our engines."""
+ def map(self, f, *sequences, **kwargs):
+ """Parallel version of builtin `map`, using all our engines.
+
+ `block` and `targets` can be passed as keyword arguments only.
+
+ There will be one task per target, so work will be chunked
+ if the sequences are longer than `targets`.
+
+ Results can be iterated as they are ready, but will become available in chunks.
+
+ Parameters
+ ----------
+
+ f : callable
+ function to be mapped
+ *sequences: one or more sequences of matching length
+ the sequences to be distributed and passed to `f`
+ block : bool
+ whether to wait for the result or not [default self.block]
+ targets : valid targets
+ targets to be used [default self.targets]
+
+ Returns
+ -------
+
+ if block=False:
+ AsyncMapResult
+ An object like AsyncResult, but which reassembles the sequence of results
+ into a single list. AsyncMapResults can be iterated through before all
+ results are complete.
+ else:
+ the result of map(f,*sequences)
+
+ """
+ block = kwargs.get('block', self.block)
+ targets = kwargs.get('targets', self.targets)
+ assert len(sequences) > 0, "must have some sequences to map onto!"
+ pf = ParallelFunction(self, f, block=block,
+ bound=True, targets=targets)
+ return pf.map(*sequences)
+
+ def imap(self, f, *sequences, **kwargs):
+ """Parallel version of builtin `itertools.imap`, load-balanced across all engines.
+
+ Each element will be a separate task, and will be load-balanced. This
+ lets individual elements be ready for iteration as soon as they come.
+
+ Parameters
+ ----------
+
+ f : callable
+ function to be mapped
+ *sequences: one or more sequences of matching length
+ the sequences to be distributed and passed to `f`
+ block : bool
+ whether to wait for the result or not [default self.block]
+
+ Returns
+ -------
+
+ if block=False:
+ AsyncMapResult
+ An object like AsyncResult, but which reassembles the sequence of results
+ into a single list. AsyncMapResults can be iterated through before all
+ results are complete.
+ else:
+ the result of map(f,*sequences)
+
+ """
+
+ block = kwargs.get('block', self.block)
+
+ assert len(sequences) > 0, "must have some sequences to map onto!"
+
pf = ParallelFunction(self, f, block=self.block,
- bound=True, targets='all')
+ bound=True, targets=None)
return pf.map(*sequences)
def parallel(self, bound=True, targets='all', block=True):
@@ -283,7 +283,9 @@ def complete(self, msg):
return self.completer.complete(msg.content.line, msg.content.text)
def apply_request(self, stream, ident, parent):
- # print (parent)
+ # flush previous reply, so this request won't block it
+ stream.flush(zmq.POLLOUT)
+
try:
content = parent[u'content']
bufs = parent[u'buffers']
@@ -354,7 +356,7 @@ def apply_request(self, stream, ident, parent):
reply_msg = self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, subheader=sub)
-
+
# if reply_msg['content']['status'] == u'error':
# self.abort_queues()

0 comments on commit 46d7c9d

Please sign in to comment.