Skip to content

Commit

Permalink
Merge pull request #2808 from minrk/parallel_wait
Browse files Browse the repository at this point in the history
improve patience for slow Hub in client tests

adds a first step in `_wait_for_idle`, where it waits for all tasks to arrive before waiting for no tasks to be running.  On a super slow machine, it was possible for `_wait_for_idle` to return prematurely, before tasks had even started.

closes #2807
  • Loading branch information
minrk committed Jan 19, 2013
2 parents 145a64f + 59bc5cd commit 9db8811
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions IPython/parallel/tests/test_client.py
Expand Up @@ -304,6 +304,20 @@ def _wait_for_idle(self):
"""wait for an engine to become idle, according to the Hub"""
rc = self.client

# step 1. wait for all requests to be noticed
# timeout 5s, polling every 100ms
msg_ids = set(rc.history)
hub_hist = rc.hub_history()
for i in range(50):
if msg_ids.difference(hub_hist):
time.sleep(0.1)
hub_hist = rc.hub_history()
else:
break

self.assertEqual(len(msg_ids.difference(hub_hist)), 0)

# step 2. wait for all requests to be done
# timeout 5s, polling every 100ms
qs = rc.queue_status()
for i in range(50):
Expand Down Expand Up @@ -407,7 +421,7 @@ def test_purge_hub_results(self):
# Wait for the Hub to realise the result is done:
# This prevents a race condition, where we
# might purge a result the Hub still thinks is pending.
time.sleep(0.1)
self._wait_for_idle()
rc2 = clientmod.Client(profile='iptest')
hist = self.client.hub_history()
ahr = rc2.get_result([hist[-1]])
Expand All @@ -423,7 +437,7 @@ def test_purge_local_results(self):
res = []
for i in range(5):
res.append(self.client[:].apply_async(lambda : 1))
time.sleep(0.1)
self._wait_for_idle()
self.client.wait(10) # wait for the results to come back
before = len(self.client.results)
self.assertEqual(len(self.client.metadata),before)
Expand All @@ -446,28 +460,29 @@ def test_purge_all_results(self):
for i in range(5):
self.client[:].apply_sync(lambda : 1)
self.client.wait(10)
self._wait_for_idle()
self.client.purge_results('all')
self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
time.sleep(0.1)
hist = self.client.hub_history()#
hist = self.client.hub_history()
self.assertEqual(len(hist), 0, msg="hub history not empty")

def test_purge_everything(self):
# ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
self.client.wait(10)
self._wait_for_idle()
self.client.purge_everything()
# The client results
self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
# the hub results
hist = self.client.hub_history()
self.assertEqual(len(hist), 0, msg="hub history not empty")
# The client "bookkeeping"
self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
self.assertEqual(len(self.client.history), 0, msg="client history not empty")
# the hub results
hist = self.client.hub_history()
self.assertEqual(len(hist), 0, msg="hub history not empty")


def test_spin_thread(self):
Expand Down

0 comments on commit 9db8811

Please sign in to comment.