Skip to content
This repository has been archived by the owner on Jan 30, 2023. It is now read-only.

Commit

Permalink
Trac #27490: Address some review comments and other cleanup:
Browse files Browse the repository at this point in the history
* Use os.wait() to block until completion of a worker process (any
  worker, or even some other arbitrary child process) rather than
  an arbitrary time.sleep loop

* Abort the docbuild on error only if ABORT_ON_ERROR

* Upon shutdown, terminate() each remaining worker first before
  join()ing them

* Other minor code improvements and added more comments
  • Loading branch information
embray committed Mar 15, 2019
1 parent 219e9c4 commit 88771df
Showing 1 changed file with 38 additions and 13 deletions.
51 changes: 38 additions & 13 deletions src/sage_setup/docbuild/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from __future__ import absolute_import, print_function
from six.moves import range

import errno
import logging
import optparse
import os
Expand Down Expand Up @@ -311,38 +312,62 @@ def build_many(target, args):
from multiprocessing import Process
workers = [None] * NUM_THREADS
queue = list(args)

# Maps worker process PIDs to the name of the document it's working
# on (the argument it was passed). This is primarily used just for
# debugging/information purposes.
jobs = {}

try:
while True:
# Check the status of each worker
for idx, w in enumerate(workers):
# If a worker process exited, check its exit code; if it
# exited non-zero and ABORT_ON_ERROR is True (the default)
# raise a RuntimeError to stop the docbuild process (we
# still give other workers a chance to finish cleanly in
# the finally: block below).
if w and w.exitcode is not None:
if w.exitcode != 0:
if w.exitcode != 0 and ABORT_ON_ERROR:
raise RuntimeError(
"worker for {} died with non-zero exit code "
"{}".format(jobs[w.pid], w.exitcode))

jobs.pop(w.pid)
w = None

if w is None:
if queue:
job = queue.pop(0)
w = Process(target=target, args=(job,))
w.start()
jobs[w.pid] = job
# Worker w is dead/not started, so start a new worker
# in its place with the next document from the queue
if w is None and queue:
job = queue.pop(0)
w = Process(target=target, args=(job,))
w.start()
jobs[w.pid] = job

workers[idx] = w

if not any(filter(None, workers)):
if all(w is None for w in workers):
# If all workers are dead and there are no more items to
# process in the queue then we are done
break

time.sleep(5)
# Wait for a worker to finish (either successfully or with
# error). We ignore the return value for now and check all
# workers at the beginning of the loop.
try:
os.wait()
except OSError as exc:
# Ignore ECHILD meaning no more child processes; i.e. all
# workers are already complete.
if exc.errno != errno.ECHILD:
raise
finally:
for w in workers:
if w is not None:
w.terminate()
w.join()
remaining_workers = [w for w in workers if w is not None]
for w in remaining_workers:
# Give any remaining workers a chance to shut down gracefully
w.terminate()
for w in remaining_workers:
w.join()


##########################################
Expand Down

0 comments on commit 88771df

Please sign in to comment.