Skip to content

Commit

Permalink
Merge pull request #1442 from gst/backport_livestatus
Browse files Browse the repository at this point in the history
Backport mod-livestatus to branch 1.4
  • Loading branch information
Seb-Solon committed Jan 15, 2015
2 parents 49f401f + 3b5ac89 commit 80fc333
Show file tree
Hide file tree
Showing 54 changed files with 1,143 additions and 560 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: python
python:
- "2.7"
- "2.6"

# command to install dependencies
install:
- pip install -r shinken/dependencies || pip install -r requirements.txt
Expand Down
2 changes: 2 additions & 0 deletions requirements.tests.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
mock
unittest2 # for python2.6

3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
Pyro==3.14
# for python2.6:
unittest2

File renamed without changes.
5 changes: 3 additions & 2 deletions shinken/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ def _log(self, level, message, format=None, print_it=True, display_level=True):
# DEBUG level logs are logged by the daemon locally
# and must not be forwarded to other satellites, or risk overloading them.
if level != logging.DEBUG:
b = Brok('log', {'log': s})
obj.add(b)
if obj: # obj can be still None if not yet initialized by load_obj().
b = Brok('log', {'log': s})
obj.add(b)

# If local logging is enabled, log to the defined handler, file.
if local_log is not None:
Expand Down
139 changes: 64 additions & 75 deletions shinken/modules/livestatus_broker/livestatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@
from livestatus_response import LiveStatusResponse
from livestatus_broker_common import LiveStatusQueryError

_VALID_QUERIES_TYPE_SORTED = (
# used in handle_request() to validate the components we get in one query/request.
# each inner tuple is sorted alphabetically: command < query < wait
( 'command', ), # there can be multiple commands also, special cased below..
( 'query', ),
( 'query', 'wait' ),
( 'command', 'query' ),
( 'command', 'query', 'wait' ),
)

def _is_valid_queries(queries_type):
assert isinstance(queries_type, tuple)
return (
queries_type in _VALID_QUERIES_TYPE_SORTED
# special case: we accept one or many commands, in one request:
or all(qtype == 'command' for qtype in queries_type)
)


class LiveStatus(object):
"""A class that represents the status of all objects in the broker
Expand All @@ -51,93 +70,63 @@ def __init__(self, datamgr, query_cache, db, pnp_path, return_queue, counters=No
def handle_request(self, data):
try:
return self.handle_request_and_fail(data)
except LiveStatusQueryError, exp:
except LiveStatusQueryError as err:
# LiveStatusQueryError(404, table)
# LiveStatusQueryError(450, column)
code, detail = exp.args
response = LiveStatusResponse()
response.set_error(code, detail)
if 'fixed16' in data:
response.responseheader = 'fixed16'
return response.respond()
except Exception, exp:
logger.error("[Livestatus] Exception! %s" % exp)
code, output = err.args
except Exception as err:
logger.error("[Livestatus] Unexpected error during process of request %r : %s" % (
data, err))
# Also show the exception
output = cStringIO.StringIO()
traceback.print_exc(file=output)
logger.error("[Livestatus] Back trace of this exception: %s" % (output.getvalue()))
output.close()
# Ok now we can return something
response = LiveStatusResponse()
response.set_error(452, data)
if 'fixed16' in data:
response.responseheader = 'fixed16'
return response.respond()
trb = traceback.format_exc()
logger.error("[Livestatus] Back trace of this exception: %s" % trb)
code = 500
output = err
# Ok now we can return something
response = LiveStatusResponse()
response.set_error(code, output)
if 'fixed16' in data:
response.responseheader = 'fixed16'
return response.respond()

def handle_request_and_fail(self, data):
"""Execute the livestatus request.
This function creates a LiveStatusRequest method, calls the parser,
This function creates a LiveStatusRequest instance, calls the parser,
handles the execution of the request and formatting of the result.
"""
request = LiveStatusRequest(data, self.datamgr, self.query_cache, self.db, self.pnp_path, self.return_queue, self.counters)
request.parse_input(data)
squeries = sorted([q.my_type for q in request.queries])
if squeries == ['command', 'query', 'wait']:
# The Multisite way
for query in [q for q in request.queries if q.my_type == 'command']:
result = query.launch_query()
response = query.response
response.format_live_data(result, query.columns, query.aliases)
output, keepalive = response.respond()
output = [q for q in request.queries if q.my_type == 'wait'] + [q for q in request.queries if q.my_type == 'query']
elif squeries == ['query', 'wait']:
# The Thruk way
output = [q for q in request.queries if q.my_type == 'wait'] + [q for q in request.queries if q.my_type == 'query']
queries = sorted(request.queries, key=lambda q: q.my_type) # sort alphabetically on the query type
queries_type = tuple(query.my_type for query in queries) # have to tuple it for testing with 'in' :
if not _is_valid_queries(queries_type):
logger.error("[Livestatus] We currently do not handle this kind of composed request: %s" % queries_type)
return '', False

cur_idx = 0
keepalive = False

for query in queries: # process the command(s), if any.
# as they are sorted alphabetically, once we get one which isn't a 'command'..
if query.my_type != 'command': # then we are done.
break
query.process_query()
# according to Check_Mk:
# COMMAND don't require a response, that is no response or more simply: an empty response:
output = ''
cur_idx += 1

if 'wait' in queries_type:
keepalive = True
elif squeries == ['command', 'query']:
for query in [q for q in request.queries if q.my_type == 'command']:
result = query.launch_query()
response = query.response
response.format_live_data(result, query.columns, query.aliases)
output, keepalive = response.respond()
for query in [q for q in request.queries if q.my_type == 'query']:
# This was a simple query, respond immediately
result = query.launch_query()
# Now bring the retrieved information to a form which can be sent back to the client
response = query.response
response.format_live_data(result, query.columns, query.aliases)
output, keepalive = response.respond()

elif squeries == ['query']:
for query in [q for q in request.queries if q.my_type == 'query']:
# This was a simple query, respond immediately
result = query.launch_query()
# Now bring the retrieved information to a form which can be sent back to the client
response = query.response
response.format_live_data(result, query.columns, query.aliases)
output, keepalive = response.respond()

elif squeries == ['command']:
for query in [q for q in request.queries if q.my_type == 'command']:
result = query.launch_query()
response = query.response
response.format_live_data(result, query.columns, query.aliases)
output, keepalive = response.respond()

elif [q.my_type for q in request.queries if q.my_type != 'command'] == []:
# Only external commands. Thruk uses it when it sends multiple
# objects into a downtime.
for query in [q for q in request.queries if q.my_type == 'command']:
result = query.launch_query()
response = query.response
response.format_live_data(result, query.columns, query.aliases)
output, keepalive = response.respond()
else:
# We currently do not handle this kind of composed request
output = ""
logger.error("[Livestatus] We currently do not handle this kind of composed request: %s" % squeries)
# we return 'wait' first and 'query' second..
output = list(reversed(queries[cur_idx:]))
elif len(queries[cur_idx:]):
# last possibility :
assert (
1 == len(queries[cur_idx:])
and query == queries[cur_idx] and query.my_type == 'query'
)
output, keepalive = query.process_query()

logger.debug("[Livestatus] Request duration %.4fs" % (time.time() - request.tic))
return output, keepalive
Expand Down
22 changes: 12 additions & 10 deletions shinken/modules/livestatus_broker/livestatus_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def __init__(self, modconf):
self.rg = LiveStatusRegenerator(self.service_authorization_strict, self.group_authorization_strict)
self.client_connections = {} # keys will be socket of client, values are LiveStatusClientThread instances
self.db = None
self.listeners = []
self._listening_thread = threading.Thread(target=self._listening_thread_run)

def add_compatibility_sqlite_module(self):
Expand Down Expand Up @@ -383,8 +384,6 @@ def manage_brok(self, brok):

def do_stop(self):
logger.info("[Livestatus Broker] So I quit")
for s in self.input:
full_safe_close(s)
# client threads could be stopped and joined by the listening_thread..
for client in self.client_connections.values():
assert isinstance(client, LiveStatusClientThread)
Expand All @@ -394,16 +393,17 @@ def do_stop(self):
client.join()
if self._listening_thread:
self._listening_thread.join()
# inputs must be closed after listening_thread
for s in self.listeners:
full_safe_close(s)
try:
self.db.close()
pass
except Exception as err:
logger.warning('Error on db close: %s' % err)


def create_listeners(self):
backlog = 5
self.listeners = []
if self.port:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
Expand All @@ -425,13 +425,12 @@ def create_listeners(self):
sock.listen(backlog)
self.listeners.append(sock)
logger.info("[Livestatus Broker] listening on unix socket: %s" % str(self.socket))
self.input = self.listeners[:]


def _listening_thread_run(self):
while not self.interrupted:
# Check for pending livestatus requests
inputready, _, exceptready = select.select(self.input, [], [], 1)
inputready, _, exceptready = select.select(self.listeners, [], [], 1)

if len(exceptready) > 0:
pass
Expand Down Expand Up @@ -480,6 +479,7 @@ def _listening_thread_run(self):
# while updating
def manage_lql_thread(self):
logger.info("[Livestatus Broker] Livestatus query thread started")
self.db.open() # make sure to open the db in this thread..
# This is the main object of this broker where the action takes place
self.livestatus = LiveStatus(self.datamgr, self.query_cache, self.db, self.pnp_path, self.from_q)
self.create_listeners()
Expand Down Expand Up @@ -521,10 +521,12 @@ def manage_lql_thread(self):
self.db.commit_and_rotate_log_db()

# end: while not self.interrupted:

self.do_stop()

def write_protocol(self, request, response):
def write_protocol(self, request=None, response=None, sent=0):
if self.debug_queries:
print "REQUEST>>>>>\n" + request + "\n\n"
#print "RESPONSE<<<<\n" + response + "\n\n"
if request is not None:
print "REQUEST>>>>>\n" + request + "\n\n"
if response is not None:
print "RESPONSE<<<<\n" + response + "\n"
print "RESPONSE SENT<<<<\n %s \n\n" % sent
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ class LiveStatusQueryError(Exception):
404: 'Invalid GET request, no such table \'%s\'',
450: 'Invalid GET request, no such column \'%s\'',
452: 'Completely invalid GET request \'%s\'',
500: 'Internal server error: %r',
}

0 comments on commit 80fc333

Please sign in to comment.