Skip to content

Commit

Permalink
Use multiprocessing.Event instead of shared bool for running
Browse files Browse the repository at this point in the history
  • Loading branch information
ryansm1 committed Oct 8, 2018
1 parent 4f83466 commit 1f10fbd
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions dramatiq/main.py
Expand Up @@ -261,7 +261,7 @@ def setup_worker_logging(args, worker_id, logging_pipe):
return get_logger("dramatiq", "WorkerProcess(%s)" % worker_id)


def worker_process(args, worker_id, logging_pipe):
def worker_process(args, worker_id, logging_pipe, running):
start_method = multiprocessing.get_start_method()
if start_method == "spawn":
exit = sys.exit
Expand Down Expand Up @@ -291,10 +291,9 @@ def worker_process(args, worker_id, logging_pipe):
return exit(RET_CONNECT)

def termhandler(signum, frame):
nonlocal running
if running:
if running.is_set():
logger.info("Stopping worker process...")
running = False
running.clear()
else:
logger.warning("Killing worker process...")
return exit(RET_KILLED)
Expand All @@ -305,9 +304,9 @@ def termhandler(signum, frame):
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, termhandler)

running = True
while running:
time.sleep(1)
running.set()
while running.is_set():
running.wait(1.0)

worker.stop()
broker.close()
Expand All @@ -329,11 +328,12 @@ def main(): # noqa

worker_pipes = []
worker_processes = []
running = multiprocessing.Event()
for worker_id in range(args.processes):
read_pipe, write_pipe = multiprocessing.Pipe()
proc = multiprocessing.Process(
target=worker_process,
args=(args, worker_id, StreamablePipe(write_pipe)),
args=(args, worker_id, StreamablePipe(write_pipe), running),
daemon=True,
)
proc.start()
Expand All @@ -348,7 +348,8 @@ def main(): # noqa
if args.pid_file:
atexit.register(remove_pidfile, args.pid_file, logger)

running, reload_process = True, False
running.set()
reload_process = False

# To avoid issues with signal delivery to user threads on
# platforms such as FreeBSD 10.3, we make the main thread block
Expand All @@ -365,13 +366,11 @@ def main(): # noqa
file_watcher = setup_file_watcher(args.watch, args.watch_use_polling)

def watch_logs(worker_pipes):
nonlocal running

if args.log_file is None:
log_file = sys.stderr
else:
log_file = open(args.log_file, mode="a", encoding="utf-8")
while running:
while running.is_set():
pipes = [parent_read_mp_pipe] + worker_pipes
events = multiprocessing.connection.wait(pipes, timeout=1)
for event in events:
Expand Down Expand Up @@ -434,7 +433,7 @@ def sighandler(signum, frame):
rc = proc.exitcode
retcode = max(retcode, rc)

running = False
running.clear()
if HAS_WATCHDOG and args.watch:
file_watcher.stop()
file_watcher.join()
Expand Down

0 comments on commit 1f10fbd

Please sign in to comment.