Skip to content

Commit

Permalink
Refactor the threshold test code to make the test filters simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
marineam committed Apr 19, 2011
1 parent 6b55bcf commit 6a4cccb
Showing 1 changed file with 37 additions and 73 deletions.
110 changes: 37 additions & 73 deletions python/nagcat/plugins/filter_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@

"""warning/critical/save filters"""

import re

from zope.interface import classProvides
from twisted.python import failure

from nagcat import errors, filters, util

Expand All @@ -40,100 +37,67 @@ def filter(self, result):
self.test.saved[self.arguments] = result
return result

class CriticalFilter(filters._Filter):
"""Mark the test as CRITICAL if the given test fails."""

classProvides(filters.IFilter)
class BaseTestFilter(filters._Filter):
"""Base class for the various test filters"""

name = "critical"
handle_default = False
handle_errors = True

# Supported operators
ops = ('>','<','=','==','>=','<=','<>','!=','=~','!~')
# Expression format
format = re.compile("\s*([<>=!~]{1,2})\s*(\S+.*)")

# Error to raise, overridden in Filter_warning
error = errors.TestCritical
raise_error = None
override_errors = ()

def __init__(self, test, default, arguments):
super(CriticalFilter, self).__init__(test, default, arguments)

match = self.format.match(arguments)
if not match:
raise errors.InitError("Invalid %s test: %s"
% (self.error.state.lower(), arguments))

self.test_op = match.group(1)
self.test_val = match.group(2)

if self.test_op not in self.ops:
raise errors.InitError("Invalid %s test operator: %s"
% (self.error.state.lower(), self.test_op))

if '~' in self.test_op:
# Check for a valid regular expression
try:
self.test_regex = re.compile(self.test_val, re.MULTILINE)
except re.error, ex:
raise errors.InitError("Invalid %s test regex '%s': %s"
% (self.error.state.lower(), self.test_val, ex))
else:
# not a regular expression, let MathString do its magic.
self.test_val = util.MathString(self.test_val)
self.test_regex = None
super(BaseTestFilter, self).__init__(test, default, arguments)

try:
self.tester = util.Tester.mktest(arguments)
except util.TesterError, ex:
raise errors.InitError("Invalid %s test: %s" % (self.name, ex))

# Convert non-python operator
if self.test_op == '=':
self.test_op = '=='
# Only attempt to handle errors if we have something to override
self.handle_errors = bool(self.override_errors)
assert self.raise_error

def filter(self, result):
# Allow critical to override warning
if (isinstance(result, errors.Failure) and
isinstance(result.value, errors.TestWarning)):
true_result = result.result
elif isinstance(result, failure.Failure):
return result
if isinstance(result, errors.Failure):
if isinstance(result.value, self.override_errors):
true_result = result.result
else:
return result
else:
true_result = result

# mimic the error.method decorator since we need to
# mimic the error.callback decorator since we need to
# report true_result, not result.
try:
if self.test_op == '=~':
if self.test_regex.search(true_result):
raise self.error("Matched regex '%s'" % self.test_val)
elif self.test_op == '!~':
if not self.test_regex.search(true_result):
raise self.error(
"Failed to match regex '%s'" % self.test_val)
else:
eval_dict = {'a':util.MathString(true_result),
'b':self.test_val}

if eval("a %s b" % self.test_op, eval_dict):
raise self.error("Test matched: %s %s"
% (self.test_op, self.test_val))
msg = self.tester.test(true_result)
if msg:
raise self.raise_error(msg)
except Exception:
result = errors.Failure(result=true_result)
return errors.Failure(result=true_result)
else:
return result

return result
class CriticalFilter(BaseTestFilter):
"""Mark the test as CRITICAL if the given test fails."""

classProvides(filters.IFilter)

name = "critical"
raise_error = errors.TestCritical
override_errors = errors.TestWarning

class WarningFilter(CriticalFilter):
class WarningFilter(BaseTestFilter):
"""Mark the test as WARNING if the given test fails."""

classProvides(filters.IFilter)

name = "warning"
handle_errors = False
error = errors.TestWarning
raise_error = errors.TestWarning

class OKFilter(CriticalFilter):
class OKFilter(BaseTestFilter):
"""Short-circuit the test and mark it as OK."""

classProvides(filters.IFilter)

name = "ok"
handle_errors = False
error = errors.TestOK
raise_error = errors.TestOK

0 comments on commit 6a4cccb

Please sign in to comment.