Skip to content
Browse files

protect LBView.targets, AsyncResult._msg_ids -> .msg_ds

  • Loading branch information...
1 parent c98ee05 commit 7d08ffdeb80d4bac1cf0a25d6452194bcdd843b4 @minrk minrk committed Jan 27, 2011
Showing with 15 additions and 7 deletions.
  1. +4 −4 IPython/zmq/parallel/asyncresult.py
  2. +3 −3 IPython/zmq/parallel/client.py
  3. +8 −0 IPython/zmq/parallel/view.py
View
8 IPython/zmq/parallel/asyncresult.py
@@ -23,15 +23,15 @@ class AsyncResult(object):
"""
def __init__(self, client, msg_ids):
self._client = client
- self._msg_ids = msg_ids
+ self.msg_ids = msg_ids
self._ready = False
self._success = None
def __repr__(self):
if self._ready:
return "<%s: finished>"%(self.__class__.__name__)
else:
- return "<%s: %r>"%(self.__class__.__name__,self._msg_ids)
+ return "<%s: %r>"%(self.__class__.__name__,self.msg_ids)
def _reconstruct_result(self, res):
@@ -74,10 +74,10 @@ def wait(self, timeout=-1):
"""
if self._ready:
return
- self._ready = self._client.barrier(self._msg_ids, timeout)
+ self._ready = self._client.barrier(self.msg_ids, timeout)
if self._ready:
try:
- results = map(self._client.results.get, self._msg_ids)
+ results = map(self._client.results.get, self.msg_ids)
results = error.collect_exceptions(results, 'get')
self._result = self._reconstruct_result(results)
except Exception, e:
View
6 IPython/zmq/parallel/client.py
@@ -504,7 +504,7 @@ def barrier(self, msg_ids=None, timeout=-1):
if isinstance(msg_id, int):
msg_id = self.history[msg_id]
elif isinstance(msg_id, AsyncResult):
- map(theids.add, msg_id._msg_ids)
+ map(theids.add, msg_id.msg_ids)
continue
theids.add(msg_id)
if not theids.intersection(self.outstanding):
@@ -849,7 +849,7 @@ def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
r = self.push({key: partition[0]}, targets=engineid, block=False)
else:
r = self.push({key: partition}, targets=engineid, block=False)
- msg_ids.extend(r._msg_ids)
+ msg_ids.extend(r.msg_ids)
r = AsyncResult(self, msg_ids)
if block:
return r.get()
@@ -866,7 +866,7 @@ def gather(self, key, dist='b', targets='all', block=None):
mapObject = Map.dists[dist]()
msg_ids = []
for index, engineid in enumerate(targets):
- msg_ids.extend(self.pull(key, targets=engineid,block=False)._msg_ids)
+ msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
r = AsyncMapResult(self, msg_ids, mapObject)
if block:
View
8 IPython/zmq/parallel/view.py
@@ -344,4 +344,12 @@ class LoadBalancedView(View):
def __repr__(self):
return "<%s %s>"%(self.__class__.__name__, self.client._addr)
+ @property
+ def targets(self):
+ return None
+
+ @targets.setter
+ def targets(self, value):
+ raise AttributeError("Cannot set targets for LoadbalancedView!")
+

0 comments on commit 7d08ffd

Please sign in to comment.
Something went wrong with that request. Please try again.