Skip to content

Commit

Permalink
Merge pull request ipython#3649 from minrk/get_dict_single
Browse files Browse the repository at this point in the history
fix AsyncResult.get_dict for single result

and add tests for single-result and invalid input (multiple results on one engine).

closes ipython#3646
  • Loading branch information
minrk committed Jul 16, 2013
2 parents d8a7929 + e6b0025 commit 52eb69f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
19 changes: 13 additions & 6 deletions IPython/parallel/client/asyncresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,21 @@ def get_dict(self, timeout=-1):
"""

results = self.get(timeout)
if self._single_result:
results = [results]
engine_ids = [ md['engine_id'] for md in self._metadata ]
bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
maxcount = bycount.count(bycount[-1])
if maxcount > 1:
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
maxcount, bycount[-1]))


rdict = {}
for engine_id, result in zip(engine_ids, results):
if engine_id in rdict:
raise ValueError("Cannot build dict, %i jobs ran on engine #%i" % (
engine_ids.count(engine_id), engine_id)
)
else:
rdict[engine_id] = result

return dict(zip(engine_ids,results))
return rdict

@property
def result(self):
Expand Down
17 changes: 17 additions & 0 deletions IPython/parallel/tests/test_asyncresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def wait(n):
time.sleep(n)
return n

def echo(x):
return x

class AsyncResultTest(ClusterTestCase):

def test_single_result_view(self):
Expand Down Expand Up @@ -77,6 +80,20 @@ def test_get_dict(self):
for eid,r in d.iteritems():
self.assertEqual(r, 5)

def test_get_dict_single(self):
view = self.client[-1]
for v in (range(5), 5, ('abc', 'def'), 'string'):
ar = view.apply_async(echo, v)
self.assertEqual(ar.get(), v)
d = ar.get_dict()
self.assertEqual(d, {view.targets : v})

def test_get_dict_bad(self):
ar = self.client[:].apply_async(lambda : 5)
ar2 = self.client[:].apply_async(lambda : 5)
ar = self.client.get_result(ar.msg_ids + ar2.msg_ids)
self.assertRaises(ValueError, ar.get_dict)

def test_list_amr(self):
ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
rlist = list(ar)
Expand Down

0 comments on commit 52eb69f

Please sign in to comment.