Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add : first version of a Trending broker module, and libexec scripts.

  • Loading branch information...
commit 7bb71007f078f269373bc6bf74873a305aff7e1d 1 parent 1ba683c
@naparuba authored
View
5 etc/services/linux_local.cfg
@@ -14,6 +14,11 @@ define service{
host_name localhost
check_command check_local_load
+ check_interval 1
+
+ # Sample for trending
+ trending_policies week
+
}
View
17 etc/shinken-specific.cfg
@@ -205,6 +205,7 @@ define broker {
# - Graphite-Perfdata = Use a Graphite time series DB for perfdata
# - WebUI = Shinken Web interface
# - glpidb = Save data in GLPI MySQL database
+ # - MongodbTrending = Save perfdata into a trending database
modules Livestatus, Simple-log, WebUI
## Advanced
@@ -614,6 +615,22 @@ define module{
queue_dump_frequency 300 ; frequency (in seconds) on wich the queue is saved for retention
}
+
+## Module: MongodbTrending
+## Loaded by: Broker
+# Enable in Mongodb trending computation, so you can query it after. Only
+# elements with valid trending_policies will be looked at.
+define module {
+ module_name MongodbTrending
+ module_type trending_broker
+ uri mongodb://localhost/?safe=true
+ database shinken
+
+ # Advanced option if you are running a cluster environnement
+ # replica_set
+}
+
+
## Module: MongodbRetention
## Loaded by: Scheduler
# Retention file to keep state between process restarts.
View
240 libexec/trending/trending_dump_centreon.py
@@ -0,0 +1,240 @@
+#!/usr/bin/python
+
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2009-2012:
+# Gabes Jean, naparuba@gmail.com
+# Gerhard Lausser, Gerhard.Lausser@consol.de
+# Gregory Starck, g.starck@gmail.com
+# Hartmut Goebel, h.goebel@goebel-consult.de
+#
+# This file is part of Shinken.
+#
+# Shinken is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Shinken is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Shinken. If not, see <http://www.gnu.org/licenses/>.
+
+import csv
+import sys
+import optparse
+import MySQLdb
+import os
+import gzip
+
+con = None
+output_dir = None
+gzip_enabled = False
+
+def connect(mysql_server, mysql_user, mysql_password, mysql_db):
+ global con
+ try:
+ con = MySQLdb.connect(mysql_server, mysql_user, mysql_password, mysql_db)
+ cur = con.cursor()
+ cur.execute("SELECT VERSION()")
+ data = cur.fetchone()
+ print "Database version : %s " % data
+
+ except MySQLdb.Error, e:
+ print "Error %d: %s" % (e.args[0],e.args[1])
+ sys.exit(1)
+
+
+def get_hosts():
+ res = []
+ cur = con.cursor()
+ cur.execute("select distinct(host_name) from index_data order by host_name;")
+ rows = cur.fetchall()
+ for row in rows:
+ #print row[0]
+ res.append(row[0])
+
+
+def list_hosts():
+ hosts = get_hosts()
+ for h in hosts:
+ print h
+
+
+def get_host(hname):
+ services = []
+ cur = con.cursor()
+ cur.execute("select service_description from index_data where host_name='%s';" % hname)
+ rows = cur.fetchall()
+ for row in rows:
+ #print row[0]
+ services.append(row[0])
+ return services
+
+
+def list_host(hname):
+ services = get_host(hname)
+ for s in services:
+ print s
+
+
+def get_metrics(hname, sdesc):
+ metrics = []
+ cur = con.cursor()
+ cur.execute("select id from index_data where host_name='%s' and service_description='%s';" % (hname, sdesc))
+ s_id = cur.fetchone()[0]
+ print "SERVICE ID", s_id
+ cur.execute("select metric_name from metrics where index_id=%s;" % s_id)
+ rows = cur.fetchall()
+ for row in rows:
+ metrics.append(row[0])
+
+ return metrics
+
+
+def list_service(hname, sdesc):
+ metrics = get_metrics(hname, sdesc)
+ for m in metrics:
+ print m
+
+
+def save_metric_file(hname, sdesc, metric, datas):
+ global output_dir
+ global gzip_enabled
+
+ p = '%s.%s.%s.csv' % (hname, sdesc, metric)
+ pth = os.path.join(output_dir, p)
+
+ if gzip_enabled:
+ #print "GZIP compression enable"
+ pth = pth+'.gz'
+ print "AS FILE", pth
+ f = gzip.open(pth, "wb")
+ else:
+ print "AS FILE", pth
+ f = open(pth, "wb")
+
+ c = csv.writer(f, delimiter=';')
+ c.writerow(["#%s"%hname,"%s"%sdesc,"%s"%metric])
+
+ for row in datas:
+ #print row
+ c.writerow(row)
+
+
+def dump_metric(hname, sdesc, metric):
+ cur = con.cursor()
+ cur.execute("select id from index_data where host_name='%s' and service_description='%s';" % (hname, sdesc))
+ s_id = cur.fetchone()[0]
+ #print "SERVICE ID", s_id
+ cur.execute("select metric_id from metrics where index_id='%s' and metric_name='%s';" % (s_id, metric))
+ try:
+ m_id = cur.fetchone()[0]
+ except TypeError:
+ print " NO ENTRY FOR", "select metric_id from metrics where index_id='%s' and metric_name='%s';" % (s_id, metric)
+ #print "Metric id", m_id
+
+ cur.execute("select count(*) from data_bin where id_metric=%s;" % m_id)
+ count = cur.fetchone()[0]
+ print 'DUMPING %s %s %s (' % (hname, sdesc, metric), count, "entries)"
+
+
+ cur.execute("select ctime, value from data_bin where id_metric=%s order by ctime;" % m_id)
+ rows = cur.fetchall()
+
+ save_metric_file(hname, sdesc, metric, rows)
+
+
+def dump_service(hname, sdesc):
+ metrics = get_metrics(hname, sdesc)
+ for m in metrics:
+ dump_metric(hname, sdesc, m)
+
+
+def dump_host(hname):
+ services = get_host(hname)
+ for sdesc in services:
+ dump_service(hname, sdesc)
+
+
+
+if __name__ == '__main__':
+ parser = optparse.OptionParser(
+ "%prog [options]", version="%prog " + '0.1')
+ parser.add_option('-M', '--mysql_server',
+ dest="mysql_server",
+ help='Host name of the mysql server')
+ parser.add_option('-u', '--user',
+ dest="mysql_user",
+ help='Mysql user name')
+ parser.add_option('-p', '--password',
+ dest="mysql_password",
+ help='Mysql user password')
+ parser.add_option('-d', '--database',
+ dest="mysql_db",
+ help='Centstorage DB')
+ parser.add_option('-H', '--host_name',
+ dest="host_name",
+ help="Hostname to dump")
+ parser.add_option('-s', '--service_description',
+ dest="service_description",
+ help="Service description to dump")
+ parser.add_option('-m', '--metric',
+ dest="metric",
+ help="Metric to dump")
+ parser.add_option('-l', '--list',
+ dest="list_only", action='store_true',
+ help="Only list the elment metrics")
+ parser.add_option('-f', '--full',
+ dest="full_dump", action='store_true',
+ help="Do a full dump of the database. Prepare disk space!")
+ parser.add_option('-o', '--output_dir',
+ dest="output_dir",
+ help="Directory where to save the data")
+ parser.add_option('-z', '--enable-gip',
+ dest="gzip_enabled", action='store_true',
+ help="Enable the gzip compression of csv files (output as csv.gz)")
+ opts, args = parser.parse_args()
+
+ mysql_server = opts.mysql_server or 'localhost'
+ mysql_user = opts.mysql_server or 'root'
+ mysql_password = opts.mysql_password or ''
+ mysql_db = opts.mysql_db or 'centstorage'
+ hname = opts.host_name
+ sdesc = opts.service_description
+ metric = opts.metric
+ list_only = opts.list_only
+ full_dump = opts.full_dump
+ output_dir = opts.output_dir or '.'
+ gzip_enabled = opts.gzip_enabled
+ connect(mysql_server, mysql_user, mysql_password, mysql_db)
+
+ if list_only:
+ if not hname:
+ list_hosts()
+ sys.exit(0)
+ if hname and not sdesc:
+ list_host(hname)
+ sys.exit(0)
+ if hname and sdesc and not metric:
+ list_service(hname, sdesc)
+ sys.exit(0)
+ sys.exit(0)
+
+ if hname and not sdesc:
+ dump_host(hname)
+ sys.exit(0)
+
+ if hname and sdesc and not metric:
+ dump_service(hname, sdesc)
+ sys.exit(0)
+
+ if hname and sdesc and metric:
+ dump_metric(hname, sdesc, metric)
+ sys.exit(0)
+
+ if not hname and not sdesc and not metric:
+ print "YOU ARE A MORON that want it's server to go down!"
View
146 libexec/trending/trending_dump_pnp4nagios.py
@@ -0,0 +1,146 @@
+#!/usr/bin/python
+
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2009-2012:
+# Gabes Jean, naparuba@gmail.com
+# Gerhard Lausser, Gerhard.Lausser@consol.de
+# Gregory Starck, g.starck@gmail.com
+# Hartmut Goebel, h.goebel@goebel-consult.de
+#
+# This file is part of Shinken.
+#
+# Shinken is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Shinken is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Shinken. If not, see <http://www.gnu.org/licenses/>.
+
+from lxml.etree import parse
+from pprint import pprint
+import os
+import sys
+import optparse
+import csv
+import rrdtool
+import gzip
+
+# Some globals vars
+output_dir = None
+gzip_enabled = False
+
+
+def parse_xml(xml_path):
+ x = {}
+ tree = parse(xml_path)
+
+ x['hname'] = tree.find('NAGIOS_HOSTNAME').text
+ x['sdesc'] = tree.find('NAGIOS_SERVICEDESC').text
+ x['ds'] = []
+ for ds in tree.findall('DATASOURCE'):
+ v = {}
+ v['id'] = ds.find('DS').text
+ v['metric'] = ds.find('NAME').text
+ v['rrdfile'] = ds.find('RRDFILE').text
+ #print v
+ x['ds'].append(v)
+ return x
+
+
+def get_day_value(rrdfile, day, ds_id):
+ res = []
+ c_start = '-s -%s' % (day+1)
+ c_start += 'd'
+ c_end = '-e -%s' % day
+ c_end += 'd'
+ #print c_start, c_end
+ v = rrdtool.fetch(rrdfile, 'AVERAGE', '-r 300', c_start, c_end)
+ #print v
+ period = v[0]
+ dss = list(v[1])
+ values = v[2]
+ #print "VALUES", day, len(values)
+ start = period[0]
+ step = period[2]
+ idx = dss.index(ds_id)
+ #print "INDEX", idx
+ pos = start
+ for c in values:
+ timestamp = pos
+ #print timestamp, c[idx]
+ res.append( (timestamp, c[idx]) )
+ pos += step
+ return res
+
+
+def save_metric_file(hname, sdesc, metric, datas):
+ p = '%s.%s.%s.csv' % (hname, sdesc, metric)
+ pth = os.path.join(output_dir, p)
+
+ if gzip_enabled:
+ #print "GZIP compression enable"
+ pth = pth+'.gz'
+ print "AS FILE", pth
+ f = gzip.open(pth, "wb")
+ else:
+ print "AS FILE", pth
+ f = open(pth, "wb")
+
+ c = csv.writer(f, delimiter=';')
+ c.writerow(["#%s"%hname,"%s"%sdesc,"%s"%metric])
+
+ for row in datas:
+ #print row
+ c.writerow(row)
+
+
+if __name__ == '__main__':
+ parser = optparse.OptionParser(
+ "%prog [options]", version="%prog " + '0.1')
+ parser.add_option('-x', '--xml',
+ dest="xml_path",
+ help='Path of the PNP4Nagios xml file to import')
+ parser.add_option('-l', '--list',
+ dest="list_only", action='store_true',
+ help="Only list the elment metrics")
+ parser.add_option('-o', '--output_dir',
+ dest="output_dir",
+ help="Directory where to save the data")
+ parser.add_option('-z', '--enable-gip',
+ dest="gzip_enabled", action='store_true',
+ help="Enable the gzip compression of csv files (output as csv.gz)")
+ opts, args = parser.parse_args()
+
+ xml_path = opts.xml_path
+ output_dir = opts.output_dir or '.'
+ gzip_enabled = opts.gzip_enabled
+
+ if not xml_path:
+ print "ERROR : no xml file set (-x). Exiting"
+ sys.exit(2)
+
+ x = parse_xml(xml_path)
+ #print x
+
+ for ds in x['ds']:
+ values = []
+ rrdfile = ds['rrdfile']
+ dsid = ds['id']
+ inverse_days = range(1, 180)
+ inverse_days.reverse()
+ for day in inverse_days:
+ res = get_day_value(rrdfile, day, dsid)
+ values.extend(res)
+ #print "VALUES", values
+
+ #for c in values:
+ # print c
+
+ save_metric_file(x['hname'], x['sdesc'], ds['metric'], values)
View
439 libexec/trending/trending_import_csv.py
@@ -0,0 +1,439 @@
+#!/usr/bin/python
+
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2009-2012:
+# Gabes Jean, naparuba@gmail.com
+# Gerhard Lausser, Gerhard.Lausser@consol.de
+# Gregory Starck, g.starck@gmail.com
+# Hartmut Goebel, h.goebel@goebel-consult.de
+#
+# This file is part of Shinken.
+#
+# Shinken is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Shinken is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Shinken. If not, see <http://www.gnu.org/licenses/>.
+
+import csv
+import time
+import sys
+import math
+import matplotlib.pyplot as plt
+import pymongo
+import math
+import optparse
+import gzip
+
+
+MAX_DIFFS = []
+
+# Somre Global var
+# (we are in a script, so it's ok)
+
+con = None
+coll = None
+reader = None
+datas = {}
+raw_datas = {}
+
+hname = None
+sdesc = None
+metric = None
+
+# 15min chunks
+CHUNK_INTERVAL = 900
+nb_chunks = int(math.ceil(86400.0/CHUNK_INTERVAL))
+print "NB_chunk", nb_chunks
+
+# Monday = 0
+for i in range(0, 7):
+ print "CREATING DAY", i
+ print "SHOULD CREATE", nb_chunks, "values by day"
+ datas[i] = [None for j in range(0, nb_chunks)]
+ raw_datas[i] = [None for j in range(0, nb_chunks)]
+
+
+
+#nb_chunks = 86400/CHUNK_INTERVAL
+_times = range(0, nb_chunks*len(datas))
+
+_t = []
+ultra_raw = []
+_raw_t = []
+
+
+
+def open_connexion():
+ global con, coll
+ # SAFE IS VERY VERY important!
+ con = pymongo.Connection('localhost', safe=True)
+ coll = con.shinken.trending
+ print coll
+
+
+def open_csv(path, gzip_enabled):
+ # Ok reopen it with
+ if gzip_enabled:
+ f = gzip.open(path, 'rb')
+ else:
+ f = open(path, 'rb')
+ reader = csv.reader(f, delimiter=';')
+ return reader
+
+
+
+
+class Load:
+ def __init__(self, m=1, initial_value=0):
+ self.exp = 0 # first exp
+ self.m = m # Number of minute of the avg
+ self.val = initial_value # first value
+ self.nb_update = 1
+
+
+ def update_load(self, new_val, interval):
+ try:
+ diff = interval
+ self.exp = 1 / math.exp(diff / (self.m * 60.0))
+ self.val = new_val + self.exp * (self.val - new_val)
+ except OverflowError: # if the time change without notice, we overflow :(
+ pass
+ except ZeroDivisionError: # do not care
+ pass
+
+
+ #self.val += new_val
+ #self.nb_update += 1
+
+ def get_load(self):
+ return self.val# / self.nb_update
+
+
+def quick_update(prev_val, new_val, m, interval):
+ l = Load(m=m, initial_value=prev_val)
+ l.update_load(new_val, interval)
+ return l.get_load()
+
+
+
+def get_sec_from_morning(t):
+ t_lt = time.localtime(t)
+ h = t_lt.tm_hour
+ m = t_lt.tm_min
+ s = t_lt.tm_sec
+ return h * 3600 + m * 60 + s
+
+
+def get_wday(t):
+ t_lt = time.localtime(t)
+ return t_lt.tm_wday
+
+
+def get_day(t):
+ return int(t - get_sec_from_morning(t))
+
+
+def get_start_of_day(year, month_id, day):
+ start_time = (year, month_id, day, 00, 00, 00, 0, 0, -1)
+ try:
+ start_time_epoch = time.mktime(start_time)
+ except OverflowError:
+ # Windows mktime sometimes crashes on (1970, 1, 1, ...)
+ start_time_epoch = 0.0
+
+ return start_time_epoch
+
+
+def get_previous_chunk(wday, chunk_nb):
+ if chunk_nb == 0:
+ chunk_nb = nb_chunks - 1
+ wday -= 1
+ else:
+ chunk_nb -= 1
+ wday = wday % 7
+ return (wday, chunk_nb)
+
+
+
+def get_key(hname, sdesc, metric, wday, chunk_nb):
+ return hname+'.'+sdesc+'.'+metric+'.'+'week'+'.'+str(wday)+'.'+'Vtrend'+'.'+str(chunk_nb)
+
+
+def update_avg(wday, chunk_nb, l1, hname, sdesc, metric):
+ key = get_key(hname, sdesc, metric, wday, chunk_nb)#hname+'.'+sdesc+'.'+metric+'.'+'week'+'.'+str(wday)+'.'+'Vtrend'+'.'+str(chunk_nb)
+ doc = coll.find_one({'_id' : key})
+ if not doc:
+ doc = {'hname':hname, 'sdesc':sdesc, 'metric':metric, 'cycle':'week',
+ 'wday':wday, 'chunk_nb':chunk_nb, 'Vtrend':l1, '_id':key,
+ 'VtrendSmooth':l1, 'VcurrentSmooth':l1, 'Vcurrent':l1
+ }
+ coll.save(doc)
+ else:
+ prev_val = doc['Vtrend']
+
+ new_Vtrend = quick_update(prev_val, l1, 5, 5)
+
+ # Now we smooth with the last value
+ # And by default, we are using the current value
+ new_VtrendSmooth = doc['VtrendSmooth']
+
+ prev_doc = None
+ cur_wday = wday
+ cur_chunk_nb = chunk_nb
+ # Ok by default take the current avg
+ prev_val = doc['VtrendSmooth']
+ prev_val_short = doc['VcurrentSmooth']
+
+ cur_wday, cur_chunk_nb = get_previous_chunk(cur_wday, cur_chunk_nb)
+ prev_key = get_key(hname, sdesc, metric, cur_wday, cur_chunk_nb)#hname+'.'+sdesc+'.'metric+'.'+'week'+str(cur_wday)+'.'+'Vtrend'+str(cur_chunk_nb)
+ prev_doc = coll.find_one({'_id' : prev_key})
+ if prev_doc:
+ prev_val = prev_doc['VtrendSmooth']
+ prev_val_short = prev_doc['VcurrentSmooth']
+ else:
+ print "OUPS, the key", key, "do not have a previous entry", cur_wday, cur_chunk_nb
+
+ # Ok really update the value now
+ new_VtrendSmooth = quick_update(prev_val, new_Vtrend, 1, 5)
+
+ # Ok and now last minutes trending
+ new_VcurrentSmooth = quick_update(prev_val_short, l1, 1, 15)
+ global MAX_DIFFS
+ d = (abs(new_VcurrentSmooth - new_VtrendSmooth)/float(new_VtrendSmooth)) * 100
+ if d > 200:
+ MAX_DIFFS.append(d)
+ print len(MAX_DIFFS)
+ print new_VcurrentSmooth, new_VtrendSmooth
+ print "DIFF", (abs(new_VcurrentSmooth - new_VtrendSmooth)/float(new_VtrendSmooth)) * 100
+
+ coll.update({'_id' : key}, {'$set' : { 'Vtrend': new_Vtrend, 'VtrendSmooth': new_VtrendSmooth, 'VcurrentSmooth' : new_VcurrentSmooth, 'Vcurrent':l1 }})
+
+
+def update_in_memory(wday, chunk_nb, l1):
+ day = datas[wday]
+ ld = day[chunk_nb]
+
+ # Update the raw_data for this day
+ raw_day = raw_datas[wday]
+ #if not raw_day[chunk_nb]:
+ raw_day[chunk_nb] = l1
+
+ #print "Current ld", ld
+ if not ld:
+ #print "NEW"* 20, chunk_nb
+ ld = Load(m=5, initial_value=l1)
+ day[chunk_nb] = ld
+ else:
+ # WAS 5
+ ld.update_load(l1, 5)
+
+
+
+def import_csv(reader, _hname, _sdesc, _metric):
+ global hname, sdesc, metric
+ i = 0
+ for row in reader:
+ if i == 0:
+ # Maybe the first line is an helper line, if so,
+ # use it
+ if len(row) == 3 and row[0].startswith('#'):
+ hname = row[0][1:]
+ sdesc = row[1]
+ metric = row[2]
+ print "VOIDING COLLECTION", coll, "from entries", hname, sdesc, metric
+ coll.remove({'hname':hname, 'sdesc':sdesc, 'metric':metric})
+ i += 1
+ _hname = hname
+ _sdesc = sdesc
+ _metric = metric
+ try:
+ _time = int(row[0])
+ except ValueError:
+ continue
+ try:
+ l1 = float(row[1])
+ except IndexError:
+ continue
+ except ValueError:
+ continue
+ sec_from_morning = get_sec_from_morning(_time)
+ wday = get_wday(_time)
+
+ chunk_nb = sec_from_morning / CHUNK_INTERVAL
+
+ update_in_memory(wday, chunk_nb, l1)
+
+ # Now update mongodb
+ update_avg(wday, chunk_nb, l1, _hname, _sdesc, _metric)
+
+
+
+
+def compute_memory_smooth():
+ global _raw_t, datas, _t
+ for (wday, day) in datas.iteritems():
+ print "wday", wday
+
+ #_t = []
+ last_good = -1
+ cur_l = None
+ for d in day:
+ if not d:
+ _t.append(last_good)
+ continue
+ last_good = d.get_load()
+ if not cur_l:
+ cur_l = Load(m=1, initial_value=last_good)
+ cur_l.update_load(last_good, 5)
+ #_t.append(last_good)
+ _t.append(cur_l.get_load())#last_good)
+
+
+ #ultra_raw = []
+
+ # Also get the raw one
+ raw_day = raw_datas[wday]
+ #_raw_t = []
+ cur_l = None
+ i = -1
+ for d in raw_day:
+ i += 1
+ if not d:
+ print "No entry for", wday, i
+ _raw_t.append(last_good)
+ ultra_raw.append(last_good)
+ continue
+ last_good = d
+ if not cur_l:
+ cur_l = Load(m=1, initial_value=last_good)
+ cur_l.update_load(last_good, 10)
+ #_raw_t.append(last_good)
+ _raw_t.append(cur_l.get_load())#last_good)
+ ultra_raw.append(last_good)
+
+ #print _t
+
+ avg_t = sum(_t)/len(_t)
+ avg_raw_t = sum(_raw_t)/len(_t)
+
+ pct_failed = 0.0
+ for i in xrange(len(_raw_t)):
+ try:
+ pct_failed += abs(_raw_t[i] - _t[i]) / _t[i]
+ except ZeroDivisionError:
+ pass
+ print "pct_failed", pct_failed
+ pct_failed /= len(_raw_t)
+ print "pct_failed", pct_failed
+
+
+
+
+def get_graph_values(hname, sdesc, metric, mkey):
+ _from_mongo = []
+ for wday in range(0, 7):
+ for chunk_nb in range(nb_chunks):
+ key = get_key(hname, sdesc, metric, wday, chunk_nb)#hname+sdesc+metric+'week'+str(wday)+'Vtrend'+str(chunk_nb)
+ doc = coll.find_one({'_id':key})
+ if doc:
+ # WAS
+ _from_mongo.append(doc[mkey])
+ else:
+ print "NO ENTRY FOR", key
+ prev_wday, prev_chunk_nb = get_previous_chunk(wday, chunk_nb)
+ key = get_key(hname, sdesc, metric, prev_wday, prev_chunk_nb)#hname+sdesc+metric+'week'+str(prev_wday)+'Vtrend'+str(prev_chunk_nb)
+ doc = coll.find_one({'_id':key})
+ if doc:
+ _from_mongo.append(doc[mkey])
+ else:
+ print "Really ! ENTRY FOR", key
+ _from_mongo.append(-1)
+
+ return _from_mongo
+
+
+
+
+
+
+
+if __name__ == '__main__':
+ parser = optparse.OptionParser(
+ "%prog [options]", version="%prog " + '0.1')
+ parser.add_option('-c', '--csv',
+ dest="csv_file",
+ help='CSV to import')
+ parser.add_option('-H', '--host_name',
+ dest="host_name",
+ help="Hostname of the imported data")
+ parser.add_option('-s', '--service_description',
+ dest="service_description",
+ help="Service description of the imported data")
+ parser.add_option('-m', '--metric', dest='metric',
+ help="Metric name of the imported data")
+ parser.add_option('-z', '--enable-gip',
+ dest="gzip_enabled", action='store_true',
+ help="Enable the gzip compression to read csv.gz files")
+ parser.add_option('-p', '--print',
+ dest='do_print', action='store_true',
+ help='Print the loaded trending')
+
+ opts, args = parser.parse_args()
+
+
+ # ok open the connexion
+ open_connexion()
+
+
+ hname = opts.host_name
+ sdesc = opts.service_description
+ metric = opts.metric
+ do_print = opts.do_print
+ csv_file = opts.csv_file
+ gzip_enabled = opts.gzip_enabled
+
+ if csv_file:
+ reader = open_csv(csv_file, gzip_enabled)
+ import_csv(reader, hname, sdesc, metric)
+ compute_memory_smooth()
+
+ if do_print:
+ _from_mongo = get_graph_values(hname, sdesc, metric, 'VtrendSmooth')
+ _from_mongo_short = get_graph_values(hname, sdesc, metric, 'VcurrentSmooth')
+ _el_raw = get_graph_values(hname, sdesc, metric, 'Vcurrent')
+ #print "PYMONGO?", _from_mongo
+ pct_failed = 0.0
+ for i in xrange(len(_from_mongo)):
+ try:
+ #pct_failed += abs(_raw_t[i] - _from_mongo[i]) / _from_mongo[i]
+ pct_failed += abs(_from_mongo_short[i] - _from_mongo[i]) / _from_mongo[i]
+ except ZeroDivisionError:
+ pass
+ except IndexError:
+ print "Was trying to get", i
+ except TypeError:
+ print "Was trying to get", i
+ print "pct_failed MONGO:", pct_failed
+ pct_failed /= len(_from_mongo)
+ print "pct_failed MONGO:", pct_failed
+
+
+ print len(_times)
+ print len(_t)
+ if csv_file:
+ plt.plot(_times, _t, 'b', _times, ultra_raw, 'c', _times, _from_mongo, 'r', _times, _from_mongo_short, 'm')#, _times, _el_raw, 'y')
+ plt.axis(ymin=0)
+ plt.show()
+ else:
+ plt.plot(_times, _from_mongo, 'r', _times, _from_mongo_short, 'y')
+ plt.show()
+
View
11 shinken/load.py
@@ -47,15 +47,20 @@ def __init__(self, m=1, initial_value=0):
self.last_update = 0 # last update of the value
self.val = initial_value # first value
- def update_load(self, new_val):
+ #
+ def update_load(self, new_val, forced_interval=None):
# The first call do not change the value, just tag
# the begining of last_update
- if self.last_update == 0:
+ # IF we force : bail out all time thing
+ if not forced_interval and self.last_update == 0:
self.last_update = time.time()
return
now = time.time()
try:
- diff = now - self.last_update
+ if forced_interval:
+ diff = forced_interval
+ else:
+ diff = now - self.last_update
self.exp = 1 / math.exp(diff / (self.m * 60.0))
self.val = new_val + self.exp * (self.val - new_val)
self.last_update = now
View
334 shinken/modules/trending_broker.py
@@ -0,0 +1,334 @@
+#!/usr/bin/python
+
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2009-2012:
+# Gabes Jean, naparuba@gmail.com
+# Gerhard Lausser, Gerhard.Lausser@consol.de
+# Gregory Starck, g.starck@gmail.com
+# Hartmut Goebel, h.goebel@goebel-consult.de
+#
+# This file is part of Shinken.
+#
+# Shinken is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Shinken is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with Shinken. If not, see <http://www.gnu.org/licenses/>.
+
+"""This Class is a plugin for the Shinken Broker. It is in charge
+to brok information of the service/host perfdatas into the Graphite
+backend. http://graphite.wikidot.com/start
+"""
+
+import re
+import math
+
+from shinken.basemodule import BaseModule
+from shinken.util import get_sec_from_morning, get_wday
+from shinken.log import logger
+from shinken.load import Load
+
+try:
+ from pymongo.connection import Connection
+except ImportError:
+ Connection = None
+
+try:
+ from pymongo import ReplicaSetConnection, ReadPreference
+except ImportError:
+ ReplicaSetConnection = None
+ ReadPreference = None
+
+
+properties = {
+ 'daemons': ['broker'],
+ 'type': 'trending_broker',
+ 'external': False,
+ }
+
+
+# Called by the plugin manager to get a broker
+def get_instance(mod_conf):
+ logger.info("[Trengind broker] Get a trending data module for plugin %s" % mod_conf.get_name())
+ instance = Trending_broker(mod_conf)
+ return instance
+
+
+# Class for the Trending Broker
+# Get broks and send them to a Mongodb instance with smooth averages
+class Trending_broker(BaseModule):
+ def __init__(self, modconf):
+ BaseModule.__init__(self, modconf)
+ self.backend = getattr(modconf, 'backend', 'mongodb')
+ self.uri = getattr(modconf, 'uri', 'localhost')
+ self.database = getattr(modconf, 'database', 'shinken')
+ self.replica_set = getattr(modconf, 'replica_set', '')
+
+ # 15min chunks
+ self.chunk_interval = int(getattr(modconf, 'chunk_interval', '900'))
+ self.nb_chunks = int(math.ceil(86400.0/self.chunk_interval))
+
+ # Some used varaible init
+ self.con = None
+ self.db = None
+ self.col = None
+
+ self.host_dict = {}
+ self.svc_dict = {}
+
+
+ # Called by Broker so we can do init stuff
+ # TODO: add conf param to get pass with init
+ # Conf from arbiter!
+ def init(self):
+ logger.info("[Trending broker] I init the %s server connection to %s:%s (%s)" % (self.get_name(), self.backend, self.uri, self.replica_set))
+ if self.replica_set:
+ self.con = ReplicaSetConnection(self.uri, replicaSet=self.replica_set, safe=True)
+ else:
+ # Old versions of pymongo do not known about fsync
+ if ReplicaSetConnection:
+ self.con = Connection(self.uri, safe=True)
+ else:
+ self.con = Connection(self.uri, safe=True)
+
+ # Open a connection
+ self.db = getattr(self.con, self.database)
+ self.col = self.db['trending']
+
+
+ # For a perf_data like /=30MB;4899;4568;1234;0 /var=50MB;4899;4568;1234;0 /toto=
+ # return ('/', '30'), ('/var', '50')
+ def get_metric_and_value(self, perf_data):
+ res = []
+ s = perf_data.strip()
+ # Get all metrics non void
+ elts = s.split(' ')
+ metrics = [e for e in elts if e != '']
+
+ for e in metrics:
+ logger.debug("[Trending broker] Groking: %s" % str(e))
+ elts = e.split('=', 1)
+ if len(elts) != 2:
+ continue
+ name = self.illegal_char.sub('_', elts[0])
+
+ raw = elts[1]
+ # get metric value and its thresholds values if they exist
+ if ';' in raw and len(filter(None, raw.split(';'))) >= 3:
+ elts = raw.split(';')
+ name_value = {name: elts[0]}#, name + '_warn': elts[1], name + '_crit': elts[2]}
+ # get the first value of ;
+ else:
+ value = raw
+ name_value = {name: raw}
+ # bailout if need
+ if name_value[name] == '':
+ continue
+
+ # Try to get the int/float in it :)
+ for key, value in name_value.items():
+ m = re.search("(-?\d*\.?\d*)(.*)", value)
+ if m:
+ name_value[key] = m.groups(0)[0]
+ else:
+ continue
+ logger.debug("[Trending broker] End of grok: %s, %s" % (name, str(value)))
+ for key, value in name_value.items():
+ res.append((key, value))
+ return res
+
+
+ # Ok a quick and dirty load computation
+ def quick_update(self, prev_val, new_val, m, interval):
+ l = Load(m=m, initial_value=prev_val)
+ l.update_load(new_val, interval)
+ return l.get_load()
+
+
+ def get_previous_chunk(self, wday, chunk_nb):
+ if chunk_nb == 0:
+ chunk_nb = nb_chunks - 1
+ wday -= 1
+ else:
+ chunk_nb -= 1
+ wday = wday % 7
+ return (wday, chunk_nb)
+
+ def get_key(self, hname, sdesc, metric, wday, chunk_nb):
+ return hname+'.'+sdesc+'.'+metric+'.'+'week'+'.'+str(wday)+'.'+'Vtrend'+'.'+str(chunk_nb)
+
+
+ def update_avg(self, wday, chunk_nb, l1, hname, sdesc, metric):
+ coll = self.col
+ key = self.get_key(hname, sdesc, metric, wday, chunk_nb)#hname+sdesc+metric+'week'+str(wday)+'Vtrend'+str(chunk_nb)
+ doc = coll.find_one({'_id' : key})
+ if not doc:
+ doc = {'hname':hname, 'sdesc':sdesc, 'metric':metric, 'cycle':'week',
+ 'wday':wday, 'chunk_nb':chunk_nb, 'Vtrend':l1, '_id':key,
+ 'VtrendSmooth':l1, 'VcurrentSmooth':l1, 'Vcurrent':l1
+ }
+ coll.save(doc)
+ else:
+ prev_val = doc['Vtrend']
+
+ new_Vtrend = self.quick_update(prev_val, l1, 5, 5)
+
+ # Now we smooth with the last value
+ # And by default, we are using the current value
+ new_VtrendSmooth = doc['VtrendSmooth']
+
+ prev_doc = None
+ cur_wday = wday
+ cur_chunk_nb = chunk_nb
+ # Ok by default take the current avg
+ prev_val = doc['VtrendSmooth']
+ prev_val_short = doc['VcurrentSmooth']
+
+ cur_wday, cur_chunk_nb = self.get_previous_chunk(cur_wday, cur_chunk_nb)
+ prev_key = self.get_key(hname, sdesc, metric, cur_wday, cur_chunk_nb)#hname+sdesc+metric+'week'+str(cur_wday)+'Vtrend'+str(cur_chunk_nb)
+ prev_doc = coll.find_one({'_id' : prev_key})
+ if prev_doc:
+ prev_val = prev_doc['VtrendSmooth']
+ prev_val_short = prev_doc['VcurrentSmooth']
+ else:
+ print "OUPS, the key", key, "do not have a previous entry", cur_wday, cur_chunk_nb
+
+ # Ok really update the value now
+ print "WFT?", prev_val, new_Vtrend, type(prev_val), type(new_Vtrend)
+
+ new_VtrendSmooth = self.quick_update(prev_val, new_Vtrend, 1, 5)
+
+ # Ok and now last minutes trending
+ new_VcurrentSmooth = self.quick_update(prev_val_short, l1, 1, 15)
+ d = (abs(new_VcurrentSmooth - new_VtrendSmooth)/float(new_VtrendSmooth)) * 100
+
+ coll.update({'_id' : key}, {'$set' : { 'Vtrend': new_Vtrend, 'VtrendSmooth': new_VtrendSmooth, 'VcurrentSmooth' : new_VcurrentSmooth, 'Vcurrent':l1 }})
+
+
+
+ # Prepare service custom vars
+ def manage_initial_service_status_brok(self, b):
+ policies = b.data['trending_policies']
+ if policies:
+ self.svc_dict[(b.data['host_name'], b.data['service_description'])] = policies
+
+
+ # Prepare host custom vars
+ def manage_initial_host_status_brok(self, b):
+ policies = b.data['trending_policies']
+ if policies:
+ self.host_dict[b.data['host_name']] = policies
+
+
+ # A service check result brok has just arrived, we UPDATE data info with this
+ def manage_service_check_result_brok(self, b):
+ data = b.data
+
+ # Maybe this service is just unknown and without policies, if so, bail out
+ policies = self.svc_dict.get((data['host_name'], data['service_description']), [])
+ if not policies:
+ return
+
+ # Ok there are some real policies
+ print "OK POLICIES FOR", (data['host_name'], data['service_description']), policies
+
+ perf_data = data['perf_data']
+ couples = self.get_metric_and_value(perf_data)
+
+ # If no values, we can exit now
+ if len(couples) == 0:
+ return
+
+
+ hname = data['host_name']#self.illegal_char.sub('_', data['host_name'])
+ #if data['host_name'] in self.host_dict:
+ # customs_datas = self.host_dict[data['host_name']]
+ # if '_GRAPHITE_PRE' in customs_datas:
+ # hname = ".".join((customs_datas['_GRAPHITE_PRE'], hname))
+
+ sdesc = data['service_description']#self.illegal_char.sub('_', data['service_description'])
+ #if (data['host_name'], data['service_description']) in self.svc_dict:
+ # customs_datas = self.svc_dict[(data['host_name'], data['service_description'])]
+ # if '_GRAPHITE_POST' in customs_datas:
+ # desc = ".".join((desc, customs_datas['_GRAPHITE_POST']))
+
+ check_time = int(data['last_chk'])
+
+ logger.debug("[Trending broker] Hostname: %s, Desc: %s, check time: %d, perfdata: %s, policies: %s" % (hname, sdesc, check_time, str(perf_data), policies))
+
+ # Ok now the real stuff is here
+ for p in policies:
+ for (metric, value) in couples:
+ try:
+ value = float(value)
+ except ValueError:
+ return
+ if value is not None:
+ print "DUMPING", (metric, value), "for", p
+
+ sec_from_morning = get_sec_from_morning(check_time)
+ wday = get_wday(check_time)
+
+ chunk_nb = sec_from_morning / self.chunk_interval
+
+ # Now update mongodb
+ print "UPDATING DB", wday, chunk_nb, value, hname, sdesc, metric, type(value)
+ self.update_avg(wday, chunk_nb, value, hname, sdesc, metric)
+
+
+
+ # A host check result brok has just arrived, we UPDATE data info with this
+ def manage_host_check_result_brok(self, b):
+ return
+
+ data = b.data
+
+ perf_data = data['perf_data']
+ couples = self.get_metric_and_value(perf_data)
+
+ # If no values, we can exit now
+ if len(couples) == 0:
+ return
+
+ hname = self.illegal_char.sub('_', data['host_name'])
+ if data['host_name'] in self.host_dict:
+ customs_datas = self.host_dict[data['host_name']]
+ if '_GRAPHITE_PRE' in customs_datas:
+ hname = ".".join((customs_datas['_GRAPHITE_PRE'], hname))
+
+ check_time = int(data['last_chk'])
+
+ logger.debug("[Graphite broker] Hostname %s, check time: %d, perfdata: %s" % (hname, check_time, str(perf_data)))
+
+ if self.graphite_data_source:
+ path = '.'.join((hname, self.graphite_data_source))
+ else:
+ path = hname
+
+ if self.use_pickle:
+ # Buffer the performance data lines
+ for (metric, value) in couples:
+ if value:
+ self.buffer.append(("%s.__HOST__.%s" % (path, metric),
+ ("%d" % check_time,
+ "%s" % value)))
+ else:
+ lines = []
+ # Send a bulk of all metrics at once
+ for (metric, value) in couples:
+ if value:
+ lines.append("%s.__HOST__.%s %s %d" % (path, metric,
+ value, check_time))
+ packet = '\n'.join(lines) + '\n' # Be sure we put \n every where
+ logger.debug("[Graphite broker] Launching: %s" % packet)
+ self.con.sendall(packet)
+
+
View
3  shinken/objects/host.py
@@ -133,6 +133,9 @@ class Host(SchedulingItem):
'trigger': StringProp(default=''),
'trigger_name': ListProp(default=''),
+ # Trending
+ 'trending_policies': ListProp(default='', fill_brok=['full_status']),
+
})
# properties set only for running purpose
View
5 shinken/objects/service.py
@@ -131,6 +131,11 @@ class Service(SchedulingItem):
# Load some triggers
'trigger': StringProp(default=''),
'trigger_name': ListProp(default=''),
+
+ # Trending
+ 'trending_policies': ListProp(default='', fill_brok=['full_status']),
+
+
})
# properties used in the running state
View
6 shinken/util.py
@@ -92,6 +92,12 @@ def get_day(t):
return int(t - get_sec_from_morning(t))
+# Same but for week day
+def get_wday(t):
+ t_lt = time.localtime(t)
+ return t_lt.tm_wday
+
+
# @memoized
def get_sec_from_morning(t):
t_lt = time.localtime(t)
Please sign in to comment.
Something went wrong with that request. Please try again.