Permalink
Browse files

update API after sagedays29

tests, docs updated to match

* Client no longer has high-level methods (only in Views)
* module functions can be pushed
* clients can have a connection timeout
* dependencies have separate switches for success/failure, not just success_only
* add `with view.temp_flags(**flags):` for temporary flags

Also updated some docs and examples
  • Loading branch information...
minrk committed Mar 25, 2011
1 parent 11592a7 commit e90463bace52cbdc9e2988bc2889e21757677057
Showing with 1,948 additions and 1,326 deletions.
  1. +14 −4 IPython/utils/pickleutil.py
  2. +30 −12 IPython/zmq/parallel/asyncresult.py
  3. +195 −443 IPython/zmq/parallel/client.py
  4. +58 −33 IPython/zmq/parallel/dependency.py
  5. +2 −6 IPython/zmq/parallel/hub.py
  6. +53 −56 IPython/zmq/parallel/remotefunction.py
  7. +11 −9 IPython/zmq/parallel/scheduler.py
  8. +16 −20 IPython/zmq/parallel/streamkernel.py
  9. +27 −6 IPython/zmq/parallel/tests/__init__.py
  10. +24 −11 IPython/zmq/parallel/tests/clienttest.py
  11. +69 −0 IPython/zmq/parallel/tests/test_asyncresult.py
  12. +58 −173 IPython/zmq/parallel/tests/test_client.py
  13. 0 IPython/zmq/parallel/tests/test_controller.py
  14. +101 −0 IPython/zmq/parallel/tests/test_dependency.py
  15. +22 −1 IPython/zmq/parallel/tests/test_newserialized.py
  16. +12 −0 IPython/zmq/parallel/tests/test_streamsession.py
  17. +287 −0 IPython/zmq/parallel/tests/test_view.py
  18. +36 −0 IPython/zmq/parallel/util.py
  19. +430 −180 IPython/zmq/parallel/view.py
  20. +11 −10 docs/examples/newparallel/dagdeps.py
  21. +36 −24 docs/examples/newparallel/demo/dependencies.py
  22. +3 −2 docs/examples/newparallel/demo/map.py
  23. +0 −22 docs/examples/newparallel/demo/remotefunction.py
  24. +4 −4 docs/examples/newparallel/demo/views.py
  25. +97 −0 docs/examples/newparallel/fetchparse.py
  26. +19 −0 docs/examples/newparallel/helloworld.py
  27. +2 −2 docs/examples/newparallel/mcdriver.py
  28. +3 −3 docs/examples/newparallel/parallelpi.py
  29. +9 −7 docs/examples/newparallel/wave2D/parallelwave-mpi.py
  30. +9 −7 docs/examples/newparallel/wave2D/parallelwave.py
  31. +1 −1 docs/source/index.txt
  32. +5 −4 docs/source/parallelz/dag_dependencies.txt
  33. +1 −0 docs/source/parallelz/index.txt
  34. +3 −3 docs/source/parallelz/parallel_demos.txt
  35. +101 −46 docs/source/parallelz/parallel_details.txt
  36. +10 −9 docs/source/parallelz/parallel_intro.txt
  37. +158 −204 docs/source/parallelz/parallel_multiengine.txt
  38. +31 −24 docs/source/parallelz/parallel_task.txt
@@ -15,10 +15,9 @@
# Imports
#-------------------------------------------------------------------------------
-from types import FunctionType
import copy
-
-from IPython.zmq.parallel.dependency import dependent
+import sys
+from types import FunctionType
import codeutil
@@ -67,12 +66,22 @@ def __init__(self, f):
self._checkType(f)
self.code = f.func_code
self.defaults = f.func_defaults
+ self.module = f.__module__ or '__main__'
self.__name__ = f.__name__
def _checkType(self, obj):
assert isinstance(obj, FunctionType), "Not a function type"
def getObject(self, g=None):
+ # try to load function back into its module:
+ if not self.module.startswith('__'):
+ try:
+ __import__(self.module)
+ except ImportError:
+ pass
+ else:
+ g = sys.modules[self.module].__dict__
+
if g is None:
g = globals()
newFunc = FunctionType(self.code, g, self.__name__, self.defaults)
@@ -82,8 +91,9 @@ def getObject(self, g=None):
# Functions
#-------------------------------------------------------------------------------
-
def can(obj):
+ # import here to prevent module-level circular imports
+ from IPython.zmq.parallel.dependency import dependent
if isinstance(obj, dependent):
keys = ('f','df')
return CannedObject(obj, keys=keys)
@@ -12,13 +12,18 @@
import time
+from zmq import MessageTracker
+
from IPython.external.decorator import decorator
from . import error
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
+# global empty tracker that's always done:
+finished_tracker = MessageTracker()
+
@decorator
def check_ready(f, self, *args, **kwargs):
"""Call spin() to sync state prior to calling the method."""
@@ -36,18 +41,26 @@ class AsyncResult(object):
msg_ids = None
_targets = None
_tracker = None
+ _single_result = False
def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
- self._client = client
if isinstance(msg_ids, basestring):
+ # always a list
msg_ids = [msg_ids]
+ if tracker is None:
+ # default to always done
+ tracker = finished_tracker
+ self._client = client
self.msg_ids = msg_ids
self._fname=fname
self._targets = targets
self._tracker = tracker
self._ready = False
self._success = None
- self._single_result = len(msg_ids) == 1
+ if len(msg_ids) == 1:
+ self._single_result = not isinstance(targets, (list, tuple))
+ else:
+ self._single_result = False
def __repr__(self):
if self._ready:
@@ -99,7 +112,7 @@ def wait(self, timeout=-1):
"""
if self._ready:
return
- self._ready = self._client.barrier(self.msg_ids, timeout)
+ self._ready = self._client.wait(self.msg_ids, timeout)
if self._ready:
try:
results = map(self._client.results.get, self.msg_ids)
@@ -149,10 +162,9 @@ def get_dict(self, timeout=-1):
return dict(zip(engine_ids,results))
@property
- @check_ready
def result(self):
"""result property wrapper for `get(timeout=0)`."""
- return self._result
+ return self.get()
# abbreviated alias:
r = result
@@ -169,7 +181,7 @@ def metadata(self):
@property
def result_dict(self):
"""result property as a dict."""
- return self.get_dict(0)
+ return self.get_dict()
def __dict__(self):
return self.get_dict(0)
@@ -181,11 +193,17 @@ def abort(self):
@property
def sent(self):
- """check whether my messages have been sent"""
- if self._tracker is None:
- return True
- else:
- return self._tracker.done
+ """check whether my messages have been sent."""
+ return self._tracker.done
+
+ def wait_for_send(self, timeout=-1):
+ """wait for pyzmq send to complete.
+
+ This is necessary when sending arrays that you intend to edit in-place.
+ `timeout` is in seconds, and will raise TimeoutError if it is reached
+ before the send completes.
+ """
+ return self._tracker.wait(timeout)
#-------------------------------------
# dict-access
@@ -285,7 +303,7 @@ def wait(self, timeout=-1):
if self._ready:
return
local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
- local_ready = self._client.barrier(local_ids, timeout)
+ local_ready = self._client.wait(local_ids, timeout)
if local_ready:
remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
if not remote_ids:
Oops, something went wrong.

0 comments on commit e90463b

Please sign in to comment.