Permalink
Browse files

threadpool: clear the exceptions queue in wait()

Updated ThreadPool.wait() to drain all exceptions out of the exception
queue everytime it raises a ThreadPoolException. This keeps the
exception queue from potentially growing endlessly and also prevents
raising old exceptions.
  • Loading branch information...
1 parent 262475e commit 7cee04185875897473d3143293248df1f183c64b @jtriley committed Mar 13, 2013
Showing with 13 additions and 2 deletions.
  1. +9 −0 starcluster/tests/test_threadpool.py
  2. +4 −2 starcluster/threadpool.py
@@ -111,3 +111,12 @@ def test_map_with_jobid(self):
except exception.ThreadPoolException, e:
exc, tb_msg, jobid = e.exceptions[0]
assert jobid == '21'
+
+ def test_exception_queue(self):
+ assert self.pool._exception_queue.qsize() == 0
+ r = 10
+ try:
+ self.pool.map(lambda x: x ** 2, [str(i) for i in range(r)])
+ except exception.ThreadPoolException, e:
+ assert len(e.exceptions) == r
+ assert self.pool._exception_queue.qsize() == 0
@@ -153,9 +153,11 @@ def wait(self, numtasks=None, return_results=True):
if pbar.maxval != 0:
pbar.finish()
self.join()
- if self._exception_queue.qsize() > 0:
+ exc_queue = self._exception_queue
+ if exc_queue.qsize() > 0:
+ excs = [exc_queue.get() for i in range(exc_queue.qsize())]
raise exception.ThreadPoolException(
- "An error occurred in ThreadPool", self._exception_queue.queue)
+ "An error occurred in ThreadPool", excs)
if return_results:
return self.get_results()

0 comments on commit 7cee041

Please sign in to comment.