Permalink
Browse files

Runner management, improved logging

- Runners cannot be started twice for rules AND actions
- Added logger traceback
- Added exception dumping on runner errors
  • Loading branch information...
1 parent 46dbf73 commit ea0ab94d63149672acc3c0a731e61aa30ffbed76 @sebastien committed Sep 13, 2012
Showing with 216 additions and 94 deletions.
  1. +216 −94 Sources/watchdog.py
View
310 Sources/watchdog.py
@@ -6,11 +6,11 @@
# License : Revised BSD Licensed
# -----------------------------------------------------------------------------
# Creation date : 10-Feb-2010
-# Last mod. : 20-Mar-2012
+# Last mod. : 13-Sep-2012
# -----------------------------------------------------------------------------
import re, sys, os, time, datetime, stat, smtplib, string, json, fnmatch
-import httplib, socket, threading, subprocess, glob
+import httplib, socket, threading, subprocess, glob, traceback
# TODO: Add System health metrics (CPU%, MEM%, DISK%, I/O, INODES)
@@ -231,10 +231,18 @@ def Info(cls, *message):
cls.I().info(*message)
@classmethod
+ def Debug(cls, *message):
+ cls.I().debug(*message)
+
+ @classmethod
def Sep(cls):
cls.I().sep()
@classmethod
+ def Traceback(cls):
+ cls.I().traceback()
+
+ @classmethod
def Output(cls, *message):
cls.I().output(*message)
@@ -252,6 +260,18 @@ def warn(self, *message):
def info(self, *message):
self("---", *message)
+ def debug(self, *message):
+ self(" ", *message)
+
+ def traceback( self ):
+ exception = traceback.format_exc()
+ lines = exception.split("\n")[:-1]
+ for i in range(len(lines)):
+ if i == len(lines) - 1:
+ self.err(lines[i])
+ else:
+ self.debug(lines[i])
+
def output(self, *message):
return
res = []
@@ -398,6 +418,7 @@ def Info(cls, pid):
# PROCESS INFORMATION
#
# -----------------------------------------------------------------------------
+
class System:
"""A collection of utilities to interact with system information"""
@@ -618,12 +639,33 @@ def __call__(self):
class Action:
"""Represents actions that can be triggered on rule sucess or failure."""
+ COUNT = 0
+
def __init__(self):
- pass
+ self.name = None
+ self.id = self.COUNT
+ self.COUNT += 1
+
+ def info( self, *message ):
+ Logger.I().info(*message)
+
+ def err( self, *message ):
+ Logger.I().err(*message)
+
+ def debug( self, *message ):
+ Logger.I().debug(*message)
+
+ def warn( self, *message ):
+ Logger.I().warn(*message)
def run(self, monitor, service, rule, runner):
pass
+ def __str__( self ):
+ if self.name:
+ return "<%s@%s %s>"% (self.__class__.__name__, self.name, self.id)
+ else:
+ return "<%s %s>" % (self.__class__.__name__, self.id)
class Log(Action):
"""Logs results to the given path."""
@@ -638,10 +680,10 @@ def preamble(self, monitor, service, rule, runner):
return "%s %s[%d]" % (timestamp(), service and service.name, runner.iteration)
def successMessage(self, monitor, service, rule, runner):
- return "%s --- %s succeeded (in %0.2fms)" % (self.preamble(monitor, service, rule, runner), runner.runnable, runner.duration)
+ return "%s --- %s succeeded (in %0.2fms)" % (self.preamble(monitor, service, rule, runner), runner.runable, runner.duration)
def failureMessage(self, monitor, service, rule, runner):
- return "%s [!] %s of %s (in %0.2fms)" % (self.preamble(monitor, service, rule, runner), runner.result, runner.runnable, runner.duration)
+ return "%s [!] %s of %s (in %0.2fms)" % (self.preamble(monitor, service, rule, runner), runner.result, runner.runable, runner.duration)
def run(self, monitor, service, rule, runner):
if runner.hasFailed():
@@ -912,6 +954,7 @@ def run(self, monitor, service, rule, runner):
# RULES
#
# -----------------------------------------------------------------------------
+
class Rule:
"""Rules return either a Sucess or Failure when run, and take actions
as 'fail' or 'success' arguments, which will be triggered by the
@@ -926,10 +969,11 @@ def __init__(self, freq, fail=(), success=()):
fail = tuple([fail])
if not (type(success) in (tuple, list)):
success = tuple([success])
- Rule.COUNT += 1
+ Rule.COUNT += 1
self.lastRun = 0
- self.freq = freq
- self.fail = fail
+ self.name = None
+ self.freq = freq
+ self.fail = fail
self.success = success
def shouldRunIn(self):
@@ -946,6 +990,12 @@ def run(self):
self.touch()
return Success()
+ def __str__( self ):
+ if self.name:
+ return "<%s@%s %s>"% (self.__class__.__name__, self.name, self.id)
+ else:
+ return "<%s %s>" % (self.__class__.__name__, self.id)
+
class HTTP(Rule):
@@ -1167,15 +1217,17 @@ def run(self):
# SERVICE
#
# -----------------------------------------------------------------------------
+
class Service:
"""A service is a collection of rules and actions. Rules are executed
and actions are triggered according to the rules result."""
# FIXME: Add a check() method that checks that actions exists for rules
def __init__(self, name=None, monitor=(), actions={}):
- self.name = name
- self.rules = []
+ self.name = name
+ self.rules = []
+ self.runners = {}
self.actions = {}
if not (type(monitor) in (tuple, list)):
monitor = tuple([monitor])
@@ -1192,24 +1244,12 @@ def getAction(self, nameOrAction):
else:
return self.actions[nameOrAction]
- def act(self, name, event):
- """Runs the action with the given name."""
- assert name in self.actions.keys()
- # NOTE: Document the protocol
- # FIXME: Use pools ?
- runner = Runner.Create(self.actions[name])
- if runner:
- runner.run(event, self)
- else:
- Logger.Err("Cannot execute action because Runner.POOL is full: %s" % (self))
-
-
# -----------------------------------------------------------------------------
#
-# RUNNER
+# RUNNER POOL
#
# -----------------------------------------------------------------------------
-# FIXME: Nos sure if pools are really necessary, they're not used so far
+
class Pool:
"""Pools are used in Watchdog to limit the number of runners/rules executed
at once. Pools have a maximum capacity, so that you can limit the numbers
@@ -1219,6 +1259,10 @@ def __init__(self, capacity):
self.capacity = capacity
self.elements = []
+ def setCapacity( self, capacity ):
+ self.capacity = capacity
+ return self
+
def add(self, element):
if self.canAdd():
self.elements.append(element)
@@ -1236,35 +1280,61 @@ def remove(self, element):
def size(self):
return len(self.elements)
+# -----------------------------------------------------------------------------
+#
+# RUNNER
+#
+# -----------------------------------------------------------------------------
+
+class RunnerStillRunning(Exception):
+
+ def __init__(self, runner):
+ Exception.__init__(self, "Runner is still running: " + str(runner))
+ self.runner = runner
+
+class RunnerThreadPoolFull(Exception):
+
+ def __init__(self, capacity):
+ Exception.__init__(self, "Runner thread pool has reached full capacity (%s)" % (capacity))
+ self.capacity = capacity
+
class Runner:
"""Wraps a Rule or Action in a separate thread an invoked the 'onEnded'
callback once the rule is executed."""
- POOL = Pool(100)
+ POOL = Pool(50)
@classmethod
- def Create(cls, runnable, context=None, iteration=None):
+ def Create(cls, runable, context=None, iteration=None, id=None):
if Runner.POOL.canAdd():
- runner = Runner(runnable, context, iteration, Runner.POOL)
+ runner = Runner(runable, context, iteration, Runner.POOL, id=id)
Runner.POOL.add(runner)
return runner
else:
return None
- def __init__(self, runnable, context=None, iteration=None, pool=None):
- assert isinstance(runnable, Action) or isinstance(runnable, Rule)
- self._onRunEnded = None
- self.runnable = runnable
- self.context = context
- self.result = None
- self.iteration = iteration
+ def __init__(self, runable, context=None, iteration=None, pool=None, id=None):
+ assert isinstance(runable, Action) or isinstance(runable, Rule)
+ self._onRunEnded = None
+ self.runable = runable
+ self.context = context
+ self.result = None
+ self.iteration = iteration
self.creationTime = now()
- self.startTime = -1
- self.endTime = -1
- self.duration = 0
- self.pool = pool
- self._thread = threading.Thread(target=self._run)
+ self.startTime = -1
+ self.endTime = -1
+ self.duration = 0
+ self.pool = pool
+ self.id = id
+ self._thread = threading.Thread(target=self._run)
+ # We want the threads to be "daemonic", ie. they will all stop once
+ # the main watchdog stops.
+ # SEE: http://docs.python.org/release/2.5.2/lib/thread-objects.html
+ self._thread.setDaemon(True)
+
+ def getID( self ):
+ return self.id
def onRunEnded(self, callback):
self._onRunEnded = callback
@@ -1280,148 +1350,200 @@ def run(self, *args):
def _run(self):
self.startTime = now()
- #try:
- if True:
- self.result = self.runnable.run(*self.args)
+ try:
+ self.result = self.runable.run(*self.args)
if isinstance(self.result, Success) or isinstance(self.result, Failure):
self.result.duration = self.duration
- #except Exception, e:
- # self.result = e
- # # FIXME: Rewrite this properly
- # Logger.Err("Exception occured in 'run' with: %s %s" % (e, self.runnable))
- self.endTime = now()
+ except Exception, e:
+ self.result = e
+ Logger.Err("Exception occured in 'run' with: %s %s" % (e, self.runable))
+ Logger.Traceback()
+ self.endTime = now()
self.duration = self.endTime - self.startTime
try:
if self.pool:
self.pool.remove(self)
except Exception, e:
- Logger.Err("Exception occured in 'run/pool' with: %s %s" % (e, self.runnable))
+ Logger.Err("Exception occured in 'run/pool' with: %s %s" % (e, self.runable))
+ Logger.Traceback()
try:
if self._onRunEnded:
self._onRunEnded(self)
except Exception, e:
- Logger.Err("Exception occured in 'run/onRunEnded' with: %s %s" % (e, self.runnable))
+ Logger.Err("Exception occured in 'run/onRunEnded' with: %s %s" % (e, self.runable))
+ Logger.Traceback()
# -----------------------------------------------------------------------------
#
# MONITOR
#
# -----------------------------------------------------------------------------
+
class Monitor:
- """The monitor is at the core of the watchdog. Rules declared in registered
+ """The monitor is at the core of the Watchdog. Rules declared in registered
services are run, and actions are executed according to the result."""
FREQUENCY = Time.s(5)
def __init__(self, *services):
- self.services = []
- self.isRunning = False
- self.freq = self.FREQUENCY
- self.logger = Logger(prefix="watchdog ")
- self.iteration = 0
+ """Creats a new monitor with the given services."""
+ self.services = []
+ self.isRunning = False
+ self.freq = self.FREQUENCY
+ self.logger = Logger(prefix="watchdog ")
+ self.iteration = 0
self.iterationLastDuration = 0
- self.runners = {}
+ self.runners = {}
map(self.addService, services)
- def runnerForRule(self, rule, context, iteration):
- if rule.id in self.runners.keys():
- # FIXME: Should kill stuck threads
- runner = self.runners[rule.id]
- if iteration - runner.iteration < 5:
- self.logger.err("Previous iteration's rule is still running: %s, you should increase its frequency." % (rule))
- else:
- self.logger.err("Previous iteration's rule %s seems to be still stuck after %s iteration." % (rule, runner.iteration - iteration))
- return None
- else:
- runner = Runner.Create(rule, context=context, iteration=iteration)
- self.runners[runner.runnable.id] = runner
- if runner:
- runner.onRunEnded(self.onRuleEnded)
- return runner
- else:
- self.logger.err("Cannot create runner for rule: %s (thread pool reached full capacity)" % (rule))
-
def addService(self, service):
+ """Adds a service to this monitor."""
self.services.append(service)
+ return self
def run(self, iterations=-1):
+ """Runs this Monitor for the given number of `iterations`.
+ If `iterations` is `-1` then the monitor will run indefinitely."""
Signals.Setup()
self.isRunning = True
while self.isRunning:
it_start = now()
next_run = it_start + self.freq
+ # For each registered service
for service in self.services:
+ # For each rule within the service
for rule in service.rules:
+ # We check if the rule has to be executed right now
+ # or a little bit later
to_wait = rule.shouldRunIn()
if to_wait > 0:
+ # If we have to wait, then we indicate what would
+ # be the time to run the rule
next_run = min(now() + to_wait, next_run)
else:
- # Create a runner
- runner = self.runnerForRule(rule, service, self.iteration)
+ # The rule has to be run right now, so we try to get a
+ # runner for it. This might fail if we can't create the
+ # runner
+ runner = self.getRunnerForRule(rule, service, self.iteration)
if runner:
rule.touch()
runner.run()
- else:
- # FIXME: Rule should fail because it can't be
- # executed
- pass
next_run = min(
now() + rule.freq,
next_run
)
- duration = now() - it_start
+ # We've reached the end of an iteration
+ duration = now() - it_start
self.iterationLastDuration = duration
self.logger.info(self.getStatusMessage())
- self.iteration += 1
+ self.iteration += 1
# Sleeps waiting for the next run
sleep_time = max(0, next_run - now())
if sleep_time > 0:
if sleep_time > 1000:
self.logger.info("Sleeping for %0.2fs" % (sleep_time / 1000.0))
time.sleep(sleep_time / 1000.0)
+ # In case we've exceeded the number of iterations, we stop the loop
if iterations > 0 and self.iteration >= iterations:
self.isRunning = False
- def getStatusMessage(self):
- return "#%d (runners=%d,threads=%d,duration=%0.2fs)" % (self.iteration, Runner.POOL.size(), threading.activeCount(), self.iterationLastDuration)
+ def getRunnerForRule( self, rule, service, iteration ):
+ try:
+ return self._createRunner( rule, service, iteration, self.onRuleEnded )
+ except RunnerStillRunning, e:
+ if self.iteration - runner.iteration < 5:
+ self.logger.err("Previous iteration's rule is still running: %s, you should increase its frequency." % (rule))
+ else:
+ self.logger.err("Previous iteration's rule %s seems to be still stuck after %s iteration." % (rule, runner.iteration - self.iteration))
+ return None
+ except RunnerThreadPoolFull, e:
+ self.logger.err("Cannot create runner for rule: %s (thread pool reached full capacity)" % (rule))
+ return None
+
+ def getRunnerForAction( self, rule, action, service, iteration ):
+ runner_id = "%s:%s" % (id(rule), id(action))
+ try:
+ return self._createRunner( action, service, iteration, self.onActionEnded, runner_id )
+ except RunnerStillRunning, e:
+ if self.iteration - runner.iteration < 5:
+ self.logger.err("Previous iteration's action is still running: %s.%s, you should increase its frequency." % (rule, str(action)))
+ else:
+ self.logger.err("Previous iteration's action %s.%s seems to be still stuck after %s iteration." % (rule, str(action), runner.iteration - self.iteration))
+ return None
+ except RunnerThreadPoolFull, e:
+ self.logger.err("Cannot create runner for action: %s.%s (thread pool reached full capacity)" % (rule, str(action)))
+ return None
def onRuleEnded(self, runner):
"""Callback bound to 'Runner.onRunEnded', trigerred once a rule was executed.
If the rule failed, actions will be executed."""
- # FIXME: Handle exception
- rule = runner.runnable
- service = runner.context
+ rule = runner.runable
+ service = runner.context
+ iteration = runner.iteration
if isinstance(runner.result, Success):
if rule.success:
- #self.logger.info("Success actions:", ", ".join(rule.success))
for action in rule.success:
action_object = service.getAction(action)
- action_runner = Runner.Create(action_object)
+ action_runner = self.getRunnerForAction(rule, action_object, service, self.iteration)
if action_runner:
action_runner.run(self, service, rule, runner)
else:
- self.logger.err("Cannot create action runner for: %s" % (action_object))
+ self.logger.err("Cannot create success action runner for: %s" % (action_object))
elif isinstance(runner.result, Failure):
self.logger.err("Failure on ", rule, ":", runner.result)
if rule.fail:
#self.logger.info("Failure actions:", ", ".join(rule.fail))
for action in rule.fail:
action_object = service.getAction(action)
- action_runner = Runner.Create(action_object)
+ action_runner = self.getRunnerForAction(rule, action_object, service, self.iteration)
if action_runner:
action_runner.run(self, service, rule, runner)
else:
- self.logger.err("Cannot create action runner for: %s" % (action_object))
+ self.logger.err("Cannot create failure action runner for: %s" % (action_object))
else:
#self.logger.info("No failure action to trigger")
pass
else:
self.logger.err("Rule did not return Success or Failure instance: %s, got %s" % (rule, runner.result))
# We unregister the runnner
- del self.runners[rule.id]
+ del self.runners[runner.getID()]
+
+ def onActionEnded( self, runner ):
+ # We unregister the runnner
+ del self.runners[runner.getID()]
+
+ def _createRunner(self, runable, context, iteration, callback, runableId=None):
+ """Creates a runner for the given runable, making sure that it won't
+ be started twice, raising `RunnerStillRunning`
+ or `RunnerThreadPoolFull` in case of problems."""
+ # FIXME: we should prefix the ID with the Rule name, if any
+ if runableId is None:
+ runable_id = id(runable)
+ else:
+ runable_id = runableId
+ if hasattr(runable, "describe"): description = runable.descibe()
+ else: description = str(runable)
+ if runable_id in self.runners:
+ runner = self.runners[runable_id]
+ raise RunnerStillRunning(runner)
+ else:
+ runner = Runner.Create(runable, context=context, iteration=iteration, id=runable_id)
+ if runner:
+ self.runners[runner.getID()] = runner
+ runner.onRunEnded(callback)
+ return runner
+ else:
+ raise RunnerThreadPoolFull(Runner.POOL.capacity)
+
+ def getStatusMessage(self):
+ return "#%d (runners=%d,threads=%d,duration=%0.2fs)" % (self.iteration, Runner.POOL.size(), threading.activeCount(), self.iterationLastDuration)
-# Globals
+# -----------------------------------------------------------------------------
+#
+# GLOBALS
+#
+# -----------------------------------------------------------------------------
SUCCESS = Success()
FAILURE = Failure()

0 comments on commit ea0ab94

Please sign in to comment.