Permalink
Browse files

watchdog: reap idle workers

  • Loading branch information...
1 parent a1d78b8 commit 5139d50beabba3c32b187da46830559dcbc91d67 @lirazsiri lirazsiri committed Dec 14, 2012
Showing with 52 additions and 11 deletions.
  1. +52 −11 cloudtask/watchdog.py
View
@@ -2,8 +2,10 @@
from os.path import isdir, isfile, join
import time
+import signal
from multiprocessing import Process
+import traceback
import re
class Error(Exception):
@@ -23,20 +25,29 @@ def get_ppid(pid):
class Watchdog:
- @staticmethod
- def watchdog(session, taskconf):
+ SIGTERM_TIMEOUT = 300
+ SIGTERM_TIMEOUT = 3
+
+ @classmethod
+ def watchdog(cls, session, taskconf):
+
+ def terminate(s, t):
+ print "watchdog received termination signal"
+
+ #signal.signal(signal.SIGTERM, terminate)
print "watchdog pid: %d, ppid: %d" % (os.getpid(), os.getppid())
session_pid = os.getppid()
workers_path = session.paths.workers
- def get_active_worker_mtimes():
+ def get_active_workers():
"""returns list of tuples (worker_id, worker_log_mtime)"""
if not isdir(workers_path):
- return
+ return []
+ active_workers = []
for fname in os.listdir(workers_path):
fpath = join(workers_path, fname)
if not isfile(fpath):
@@ -51,21 +62,51 @@ def get_active_worker_mtimes():
continue
mtime = os.stat(fpath).st_mtime
- yield worker_id, mtime
+ active_workers.append((worker_id, mtime))
+ return active_workers
- while True:
- time.sleep(1)
+ def log(s):
+ session.mlog.write("# watchdog: %s\n" % s)
- if not pid_exists(session_pid):
- break
+ watchdog_timeout = taskconf.timeout * 2
+ session_idle = 0
- mtimes = [ mtime for worker_id, mtime in get_active_worker_mtimes() ]
+ while pid_exists(session_pid) and session_idle < watchdog_timeout:
+ time.sleep(1)
+
+ mtimes = [ mtime for worker_id, mtime in get_active_workers() ]
if not mtimes:
continue
session_idle = time.time() - max(mtimes)
- print "session_idle = %d" % session_idle
+
+ if session_idle >= watchdog_timeout:
+ log("session idle after %d seconds" % watchdog_timeout)
+
+ # SIGTERM active workers
+ for worker_id, worker_mtime in get_active_workers():
+ try:
+ log("kill -TERM %d" % worker_id)
+ os.kill(worker_id, signal.SIGTERM)
+ except:
+ traceback.print_exc(file=session.mlog)
+
+ # wait up to SIGTERM_TIMEOUT for them to terminate
+ started = time.time()
+ while time.time() - started < cls.SIGTERM_TIMEOUT:
+ time.sleep(1)
+ active_workers = get_active_workers()
+ if not active_workers:
+ break
+
+ # no more Mr. Nice Guy: SIGKILL workers that are still alive
+ for worker_id, worker_mtime in get_active_workers():
+ try:
+ log("kill -KILL %d" % worker_id)
+ os.kill(worker_id, signal.SIGKILL)
+ except:
+ traceback.print_exc(file=session.mlog)
@classmethod
def run(cls, session, taskconf):

0 comments on commit 5139d50

Please sign in to comment.