Skip to content

Commit

Permalink
Close tqdm after exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
rlmv committed Apr 21, 2018
1 parent a0af8c1 commit 30cbd57
Showing 1 changed file with 36 additions and 32 deletions.
68 changes: 36 additions & 32 deletions pyphi/compute/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,34 +239,40 @@ def run_parallel(self):
"""Perform the computation in parallel, reading results from the output
queue and passing them to ``process_result``.
"""
self.start_parallel()
try:
self.start_parallel()

result = self.empty_result(*self.context)
result = self.empty_result(*self.context)

while self.num_processes > 0:
r = self.result_queue.get()
self.maybe_put_task()
while self.num_processes > 0:
r = self.result_queue.get()
self.maybe_put_task()

if r is POISON_PILL:
self.num_processes -= 1
if r is POISON_PILL:
self.num_processes -= 1

elif isinstance(r, ExceptionWrapper):
r.reraise()
elif isinstance(r, ExceptionWrapper):
r.reraise()

else:
result = self.process_result(r, result)
self.progress.update(1)
else:
result = self.process_result(r, result)
self.progress.update(1)

# Did `process_result` decide to terminate early?
if self.done:
self.complete.set()
# Did `process_result` decide to terminate early?
if self.done:
self.complete.set()

self.finish_parallel()
self.finish_parallel()
except Exception:
raise
finally:
log.debug('Removing progress bar')
self.progress.close()

return result

def finish_parallel(self):
"""Terminate all processes and the log thread."""
"""Orderly shutdown of workers."""
for process in self.processes:
process.join()

Expand All @@ -281,27 +287,25 @@ def finish_parallel(self):
self.task_queue.close()
self.result_queue.close()

# Remove the progress bar
log.debug('Removing progress bar')
self.progress.close()

def run_sequential(self):
"""Perform the computation sequentially, only holding two computed
objects in memory at a time.
"""
result = self.empty_result(*self.context)

for obj in self.iterable:
r = self.compute(obj, *self.context)
result = self.process_result(r, result)
self.progress.update(1)
try:
result = self.empty_result(*self.context)

# Short-circuited?
if self.done:
break
for obj in self.iterable:
r = self.compute(obj, *self.context)
result = self.process_result(r, result)
self.progress.update(1)

# Remove progress bar
self.progress.close()
# Short-circuited?
if self.done:
break
except Exception as e:
raise e
finally:
self.progress.close()

return result

Expand Down

0 comments on commit 30cbd57

Please sign in to comment.