Browse files

view decorators for syncing history/results

  • Loading branch information...
1 parent 69e9c2c commit a329cbf8a663c5a5fb9f7b01d680cc7a0368bf10 @minrk minrk committed Oct 15, 2010
Showing with 57 additions and 14 deletions.
  1. +57 −14 IPython/zmq/parallel/view.py
View
71 IPython/zmq/parallel/view.py
@@ -12,34 +12,68 @@ def myblock(f, self, *args, **kwargs):
self.client.block = block
return ret
+@decorator
+def save_ids(f, self, *args, **kwargs):
+ ret = f(self, *args, **kwargs)
+ msg_ids = self.client.history[-self._ntargets:]
+ self.history.extend(msg_ids)
+ map(self.outstanding.add, msg_ids)
+ return ret
+
+@decorator
+def sync_results(f, self, *args, **kwargs):
+ ret = f(self, *args, **kwargs)
+ delta = self.outstanding.difference(self.client.outstanding)
+ completed = self.outstanding.intersection(delta)
+ self.outstanding = self.outstanding.difference(completed)
+ for msg_id in completed:
+ self.results[msg_id] = self.client.results[msg_id]
+ return ret
+
+@decorator
+def spin_after(f, self, *args, **kwargs):
+ ret = f(self, *args, **kwargs)
+ self.spin()
+ return ret
+
+
class View(object):
"""Base View class"""
_targets = None
+ _ntargets = None
block=None
+ history=None
def __init__(self, client, targets):
self.client = client
self._targets = targets
+ self._ntargets = 1 if isinstance(targets, int) else len(targets)
self.block = client.block
-
+ self.history = []
+ self.outstanding = set()
+ self.results = {}
+
def __repr__(self):
strtargets = str(self._targets)
if len(strtargets) > 16:
strtargets = strtargets[:12]+'...]'
return "<%s %s>"%(self.__class__.__name__, strtargets)
-
- @property
- def results(self):
- return self.client.results
-
+
@property
def targets(self):
return self._targets
-
+
@targets.setter
def targets(self, value):
raise TypeError("Cannot set my targets argument after construction!")
-
+
+ @sync_results
+ def spin(self):
+ """spin the client, and sync"""
+ self.client.spin()
+
+ @sync_results
+ @save_ids
def apply(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines, returning the result.
@@ -51,7 +85,8 @@ def apply(self, f, *args, **kwargs):
returns actual result of f(*args, **kwargs)
"""
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
-
+
+ @save_ids
def apply_async(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
@@ -60,7 +95,9 @@ def apply_async(self, f, *args, **kwargs):
returns msg_id
"""
return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
-
+
+ @spin_after
+ @save_ids
def apply_sync(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
returning the result.
@@ -70,7 +107,9 @@ def apply_sync(self, f, *args, **kwargs):
returns: actual result of f(*args, **kwargs)
"""
return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
-
+
+ @sync_results
+ @save_ids
def apply_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s).
@@ -83,7 +122,9 @@ def apply_bound(self, f, *args, **kwargs):
"""
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
-
+
+ @sync_results
+ @save_ids
def apply_async_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s)
in a nonblocking manner.
@@ -94,7 +135,9 @@ def apply_async_bound(self, f, *args, **kwargs):
"""
return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
-
+
+ @spin_after
+ @save_ids
def apply_sync_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
@@ -104,7 +147,7 @@ def apply_sync_bound(self, f, *args, **kwargs):
"""
return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
-
+
class DirectView(View):
"""Direct Multiplexer View"""

0 comments on commit a329cbf

Please sign in to comment.