Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

PendingResult->AsyncResult; match multiprocessing.AsyncResult api

  • Loading branch information...
commit cd8465672be199996c6a4443a6b7e924a08f4e84 1 parent 44e6eb1
@minrk minrk authored
View
112 IPython/zmq/parallel/asyncresult.py
@@ -0,0 +1,112 @@
+"""AsyncResult objects for the client"""
+#-----------------------------------------------------------------------------
+# Copyright (C) 2010 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+import error
+
+#-----------------------------------------------------------------------------
+# Classes
+#-----------------------------------------------------------------------------
+
+class AsyncResult(object):
+ """Class for representing results of non-blocking calls.
+
+ Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
+ """
+ def __init__(self, client, msg_ids):
+ self._client = client
+ self._msg_ids = msg_ids
+ self._ready = False
+ self._success = None
+
+ def __repr__(self):
+ if self._ready:
+ return "<%s: finished>"%(self.__class__.__name__)
+ else:
+ return "<%s: %r>"%(self.__class__.__name__,self._msg_ids)
+
+
+ def _reconstruct_result(self, res):
+ """
+ Override me in subclasses for turning a list of results
+ into the expected form.
+ """
+ if len(res) == 1:
+ return res[0]
+ else:
+ return res
+
+ def get(self, timeout=-1):
+ """Return the result when it arrives.
+
+ If `timeout` is not ``None`` and the result does not arrive within
+ `timeout` seconds then ``TimeoutError`` is raised. If the
+ remote call raised an exception then that exception will be reraised
+ by get().
+ """
+ if not self.ready():
+ self.wait(timeout)
+
+ if self._ready:
+ if self._success:
+ return self._result
+ else:
+ raise self._exception
+ else:
+ raise error.TimeoutError("Result not ready.")
+
+ def ready(self):
+ """Return whether the call has completed."""
+ if not self._ready:
+ self.wait(0)
+ return self._ready
+
+ def wait(self, timeout=-1):
+ """Wait until the result is available or until `timeout` seconds pass.
+ """
+ if self._ready:
+ return
+ self._ready = self._client.barrier(self._msg_ids, timeout)
+ if self._ready:
+ try:
+ results = map(self._client.results.get, self._msg_ids)
+ results = error.collect_exceptions(results, 'get')
+ self._result = self._reconstruct_result(results)
+ except Exception, e:
+ self._exception = e
+ self._success = False
+ else:
+ self._success = True
+
+
+ def successful(self):
+ """Return whether the call completed without raising an exception.
+
+ Will raise ``AssertionError`` if the result is not ready.
+ """
+ assert self._ready
+ return self._success
+
+class AsyncMapResult(AsyncResult):
+ """Class for representing results of non-blocking gathers.
+
+ This will properly reconstruct the gather.
+ """
+
+ def __init__(self, client, msg_ids, mapObject):
+ self._mapObject = mapObject
+ AsyncResult.__init__(self, client, msg_ids)
+
+ def _reconstruct_result(self, res):
+ """Perform the gather on the actual results."""
+ return self._mapObject.joinPartitions(res)
+
+
View
26 IPython/zmq/parallel/client.py
@@ -29,7 +29,7 @@
from dependency import Dependency, depend, require
import error
import map as Map
-from pendingresult import PendingResult,PendingMapResult
+from asyncresult import AsyncResult, AsyncMapResult
from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
#--------------------------------------------------------------------------
@@ -746,7 +746,7 @@ def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
self.barrier(msg_id)
return self._maybe_raise(self.results[msg_id])
else:
- return PendingResult(self, [msg_id])
+ return AsyncResult(self, [msg_id])
def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
after=None, follow=None):
@@ -776,7 +776,7 @@ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
if block:
self.barrier(msg_ids)
else:
- return PendingResult(self, msg_ids)
+ return AsyncResult(self, msg_ids)
if len(msg_ids) == 1:
return self._maybe_raise(self.results[msg_ids[0]])
else:
@@ -785,12 +785,24 @@ def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None,
result[target] = self.results[mid]
return error.collect_exceptions(result, f.__name__)
+ #--------------------------------------------------------------------------
+ # Map and decorators
+ #--------------------------------------------------------------------------
+
def map(self, f, *sequences):
"""Parallel version of builtin `map`, using all our engines."""
pf = ParallelFunction(self, f, block=self.block,
bound=True, targets='all')
return pf.map(*sequences)
+ def parallel(self, bound=True, targets='all', block=True):
+ """Decorator for making a ParallelFunction"""
+ return parallel(self, bound=bound, targets=targets, block=block)
+
+ def remote(self, bound=True, targets='all', block=True):
+ """Decorator for making a RemoteFunction"""
+ return remote(self, bound=bound, targets=targets, block=block)
+
#--------------------------------------------------------------------------
# Data movement
#--------------------------------------------------------------------------
@@ -831,7 +843,7 @@ def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
else:
mid = self.push({key: partition}, targets=engineid, block=False)
msg_ids.append(mid)
- r = PendingResult(self, msg_ids)
+ r = AsyncResult(self, msg_ids)
if block:
r.wait()
return
@@ -850,7 +862,7 @@ def gather(self, key, dist='b', targets='all', block=True):
for index, engineid in enumerate(targets):
msg_ids.append(self.pull(key, targets=engineid,block=False))
- r = PendingMapResult(self, msg_ids, mapObject)
+ r = AsyncMapResult(self, msg_ids, mapObject)
if block:
r.wait()
return r.result
@@ -1002,6 +1014,6 @@ def spin(self):
'ParallelFunction',
'DirectView',
'LoadBalancedView',
- 'PendingResult',
- 'PendingMapResult'
+ 'AsyncResult',
+ 'AsyncMapResult'
]
View
3  IPython/zmq/parallel/error.py
@@ -145,6 +145,9 @@ class SecurityError(KernelError):
class FileTimeoutError(KernelError):
pass
+class TimeoutError(KernelError):
+ pass
+
class RemoteError(KernelError):
"""Error raised elsewhere"""
ename=None
View
4 IPython/zmq/parallel/heartmonitor.py
@@ -9,7 +9,7 @@
import uuid
import zmq
-from zmq.devices import ProcessDevice
+from zmq.devices import ProcessDevice,ThreadDevice
from zmq.eventloop import ioloop, zmqstream
#internal
@@ -27,7 +27,7 @@ class Heart(object):
device=None
id=None
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
- self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type)
+ self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
self.device.daemon=True
self.device.connect_in(in_addr)
self.device.connect_out(out_addr)
View
75 IPython/zmq/parallel/pendingresult.py
@@ -1,75 +0,0 @@
-"""PendingResult objects for the client"""
-#-----------------------------------------------------------------------------
-# Copyright (C) 2010 The IPython Development Team
-#
-# Distributed under the terms of the BSD License. The full license is in
-# the file COPYING, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-#-----------------------------------------------------------------------------
-# Imports
-#-----------------------------------------------------------------------------
-
-import error
-
-#-----------------------------------------------------------------------------
-# Classes
-#-----------------------------------------------------------------------------
-
-class PendingResult(object):
- """Class for representing results of non-blocking calls."""
- def __init__(self, client, msg_ids):
- self.client = client
- self.msg_ids = msg_ids
- self._result = None
- self.done = False
-
- def __repr__(self):
- if self.done:
- return "<%s: finished>"%(self.__class__.__name__)
- else:
- return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
-
- @property
- def result(self):
- if self._result is not None:
- return self._result
- if not self.done:
- self.wait(0)
- if self.done:
- results = map(self.client.results.get, self.msg_ids)
- results = error.collect_exceptions(results, 'get_result')
- self._result = self.reconstruct_result(results)
- return self._result
- else:
- raise error.ResultNotCompleted
-
- def reconstruct_result(self, res):
- """
- Override me in subclasses for turning a list of results
- into the expected form.
- """
- if len(res) == 1:
- return res[0]
- else:
- return res
-
- def wait(self, timout=-1):
- self.done = self.client.barrier(self.msg_ids)
- return self.done
-
-class PendingMapResult(PendingResult):
- """Class for representing results of non-blocking gathers.
-
- This will properly reconstruct the gather.
- """
-
- def __init__(self, client, msg_ids, mapObject):
- self.mapObject = mapObject
- PendingResult.__init__(self, client, msg_ids)
-
- def reconstruct_result(self, res):
- """Perform the gather on the actual results."""
- return self.mapObject.joinPartitions(res)
-
-
View
6 IPython/zmq/parallel/remotefunction.py
@@ -11,7 +11,7 @@
#-----------------------------------------------------------------------------
import map as Map
-from pendingresult import PendingMapResult
+from asyncresult import AsyncMapResult
#-----------------------------------------------------------------------------
# Decorators
@@ -126,10 +126,10 @@ def __call__(self, *sequences):
f=self.func
mid = self.client.apply(f, args=args, block=False,
bound=self.bound,
- targets=engineid).msg_ids[0]
+ targets=engineid)._msg_ids[0]
msg_ids.append(mid)
- r = PendingMapResult(self.client, msg_ids, self.mapObject)
+ r = AsyncMapResult(self.client, msg_ids, self.mapObject)
if self.block:
r.wait()
return r.result
Please sign in to comment.
Something went wrong with that request. Please try again.