Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

watchdog: implemented robust cleanup logic

  • Loading branch information...
commit 380237dd6813f6cb70091d4dfcec6f2148c0f8b9 1 parent 277fffc
@lirazsiri lirazsiri authored
Showing with 57 additions and 3 deletions.
  1. +57 −3 cloudtask/watchdog.py
View
60 cloudtask/watchdog.py
@@ -8,6 +8,9 @@
import traceback
import re
+import logalyzer
+from _hub import Hub
+
class Error(Exception):
pass
@@ -67,9 +70,39 @@ def idletime(self):
return time.time() - max(mtimes)
idletime = property(idletime)
+class Retrier:
+ def __init__(self, timeout, errorsleep, errorlog=None):
+ self.timeout = timeout
+ self.errorsleep = errorsleep
+ self.errorlog = errorlog
+
+ def __call__(self, callable, *args, **kwargs):
+ started = time.time()
+
+ while (time.time() - started) < self.timeout:
+ try:
+ return callable(*args, **kwargs)
+
+ except KeyboardInterrupt:
+ break
+
+ except:
+ if self.errorlog:
+ traceback.print_exc(file=self.errorlog)
+
+ time.sleep(self.errorsleep)
+
+ raise
+
class Watchdog:
SIGTERM_TIMEOUT = 300
- SIGTERM_TIMEOUT = 3
+
+ DESTROY_ERROR_TIMEOUT = 3600*3
+ DESTROY_ERROR_SLEEP = 300
+
+ @staticmethod
+ def log(fh, s):
+ fh.write("# watchdog: %s\n" % s)
@classmethod
def watch(cls, workers_path, logfh, timeout):
@@ -78,7 +111,7 @@ def watch(cls, workers_path, logfh, timeout):
watcher = SessionWatcher(session_pid, workers_path)
def log(s):
- logfh.write("# watchdog: %s\n" % s)
+ cls.log(logfh, s)
idletime = None
@@ -142,11 +175,32 @@ def stop(s, t):
except Stopped:
pass
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
cls.cleanup(session, taskconf)
@classmethod
def cleanup(cls, session, taskconf):
- pass
+
+ def log(s):
+ cls.log(session.mlog, s)
+
+ def get_zombie_instances():
+ wl = logalyzer.WorkersLog(session.paths.workers, taskconf.command)
+ for worker in wl.workers:
+ if worker.instanceid and not worker.instancetime:
+ yield worker.instanceid
+
+ zombie_instances = list(get_zombie_instances())
+ if not zombie_instances:
+ return
+
+ zombie_instances.sort()
+ log("destroying zombie instances: " + " ".join(sorted(zombie_instances)))
+ hub = Hub(taskconf.hub_apikey)
+ retrier = Retrier(cls.DESTROY_ERROR_TIMEOUT, cls.DESTROY_ERROR_SLEEP, session.mlog)
+ destroyed = retrier(hub.destroy, *zombie_instances)
+ log("destroyed zombie instances: " + " ".join(sorted([ instanceid for ipaddress, instanceid in destroyed ])))
+
def __init__(self, session, taskconf):
self.session = session
Please sign in to comment.
Something went wrong with that request. Please try again.