Permalink
Browse files

implemented basic scaffolding for watchdog

  • Loading branch information...
1 parent 35bc49f commit a1d78b8cf75f00a3685c7611c8ca26927bacaaf5 @lirazsiri lirazsiri committed Dec 13, 2012
Showing with 95 additions and 0 deletions.
  1. +6 −0 cloudtask/task.py
  2. +89 −0 cloudtask/watchdog.py
View
@@ -82,6 +82,7 @@
from taskconf import TaskConf
from reporter import Reporter
+from watchdog import Watchdog
class Task:
@@ -352,6 +353,8 @@ def terminate(sig, f):
raise CaughtSignal("caught %s termination signal" % sigs[sig], sig)
+ watchdog = Watchdog(session, taskconf)
+
signal.signal(signal.SIGINT, terminate)
signal.signal(signal.SIGTERM, terminate)
@@ -378,6 +381,9 @@ def terminate(sig, f):
else:
results = []
+ watchdog.terminate()
+ watchdog.join()
+
session.jobs.update(jobs, results)
session_results = [ result for job, result in session.jobs.finished ]
View
@@ -0,0 +1,89 @@
+import os
+from os.path import isdir, isfile, join
+
+import time
+from multiprocessing import Process
+
+import re
+
+class Error(Exception):
+ pass
+
+def pid_exists(pid):
+ return isdir("/proc/%d" % pid)
+
+def get_ppid(pid):
+ if not pid_exists(pid):
+ raise Error("pid %d does not exist" % pid)
+ status = file("/proc/%d/status" % pid).read()
+
+ status_dict = dict([ (key.lower(),val)
+ for key,val in [ re.split(r':\t\s*', line) for line in status.splitlines() ]])
+ return int(status_dict['ppid'])
+
+
+class Watchdog:
+ @staticmethod
+ def watchdog(session, taskconf):
+
+ print "watchdog pid: %d, ppid: %d" % (os.getpid(), os.getppid())
+
+ session_pid = os.getppid()
+ workers_path = session.paths.workers
+
+ def get_active_worker_mtimes():
+ """returns list of tuples (worker_id, worker_log_mtime)"""
+
+ if not isdir(workers_path):
+ return
+
+ for fname in os.listdir(workers_path):
+ fpath = join(workers_path, fname)
+ if not isfile(fpath):
+ continue
+
+ try:
+ worker_id = int(fname)
+ except ValueError:
+ continue
+
+ if not pid_exists(worker_id) or get_ppid(worker_id) != session_pid:
+ continue
+
+ mtime = os.stat(fpath).st_mtime
+ yield worker_id, mtime
+
+
+ while True:
+ time.sleep(1)
+
+ if not pid_exists(session_pid):
+ break
+
+ mtimes = [ mtime for worker_id, mtime in get_active_worker_mtimes() ]
+ if not mtimes:
+ continue
+
+ session_idle = time.time() - max(mtimes)
+ print "session_idle = %d" % session_idle
+
+ @classmethod
+ def run(cls, session, taskconf):
+ try:
+ cls.watchdog(session, taskconf)
+ except KeyboardInterrupt:
+ pass
+
+ def __init__(self, session, taskconf):
+ self.session = session
+ self.taskconf = taskconf
+
+ self.process = Process(target=self.run, args=(self.session, self.taskconf))
+ self.process.start()
+
+ def terminate(self):
+ self.process.terminate()
+
+ def join(self):
+ self.process.join()
+

0 comments on commit a1d78b8

Please sign in to comment.