Skip to content

Commit

Permalink
Fixes: from deploy test
Browse files Browse the repository at this point in the history
  • Loading branch information
redref committed Jul 7, 2016
1 parent 61c4c8e commit d235bde
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 86 deletions.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def get_version():
data_files = [
('/etc/tantale', ['conf/tantale.conf.example']),
]
if os.path.isdir('/usr/lib/systemd'):
if os.path.isdir('/usr/lib/systemd/user'):
# Systemd script
data_files.append((
'/usr/lib/systemd/system', ['service/systemd/tantale.service']))
Expand Down Expand Up @@ -94,7 +94,7 @@ def run(self):
install.run(self)

if os.path.isfile('/etc/init.d/tantale'):
os.lchmod('/etc/init.d/tantale', 755)
os.chmod('/etc/init.d/tantale', 755)

setup(
name='tantale',
Expand Down
2 changes: 1 addition & 1 deletion src/tantale/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import os

VERSION = "0.1.5"
VERSION = "0.1.6"

config_min = os.path.join(os.path.dirname(__file__), 'config_min.conf')
13 changes: 11 additions & 2 deletions src/tantale/backends/elasticsearch/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,34 @@ def status_iterator(self):
res = self.elasticclient.mget(
body=json.dumps({"docs": body}),
index=self.status_index,
_source_include=('status', 'ack'),
_source_include=('status', 'ack', 'timestamp'),
refresh=True,
)

for doc in res['docs']:
check = checks.pop(0)

if 'found' in doc and doc['found'] is True:
# Do not erase previous checks with old data
if (check.timestamp * 1000) < doc['_source']['timestamp']:
continue

doc['doc'] = {}
doc['_op_type'] = 'update'
doc['doc']['output'] = check.output
doc['doc']['contacts'] = check.contacts
doc['doc']['check'] = check.check
doc['doc']['hostname'] = check.hostname

if check.status != doc['_source']['status']:
doc['doc']['status'] = check.status
doc['doc']['timestamp'] = check.timestamp * 1000

# Add a log entry of this change (no ack / last_check)
self.logs.append(doc['doc'])
log = {}
for f in check.log_fields:
log[f] = doc['doc'][f]
self.logs.append(log)

doc['doc']['ack'] = 0

Expand Down
47 changes: 21 additions & 26 deletions src/tantale/client/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,41 +54,36 @@ def connect(self):
except:
self.sock = None

def close(self):
if self.sock:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
self.sock = None

def __del__(self):
self.close()

def sending_thread(self, res_q):
"""
Send results from queue
"""
self.connect()

while True:

result = res_q.get(True)
# Not keeping data in memory
res_q.task_done()

if not self.sock:
self.connect()

if not self.sock:
self.log.info(
'Reconnect to %s:%s failed' % (self.host, self.port))
continue

self.log.debug("Sending: %s" % result)

try:
self.sock.send(bytes(json.dumps(result) + '\n'))
result = res_q.get(True)
# Not keeping data in memory
res_q.task_done()

if not self.sock:
self.connect()

if not self.sock:
self.log.info(
'Reconnect to %s:%s failed' % (self.host, self.port))
continue

self.log.debug("Sending: %s" % result)

try:
self.sock.send(bytes(json.dumps(result) + '\n'))
except:
self.log.info("Connection reset")
self.log.debug(traceback.format_exc())
self.sock = None
except:
self.close()
self.log.error("Unknown error sending checks")

def run(self, init_done=None):
"""
Expand Down
116 changes: 65 additions & 51 deletions src/tantale/input/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,62 +82,76 @@ def sig_handler(signum, frame):

# Logic
queue_state = True
stack = ""
while self.running:
r = None
try:
r, w, e = select.select(connections, [], [])
except:
# Handle "Interrupted system call"
break
stack = {}

for sock in r:
if sock == s:
# New clients
sockfd, addr = s.accept()
connections.append(sockfd)
else:
if isinstance(sock, int):
data = os.read(sock, 4096)
try:
while self.running:
r = None
try:
r, w, e = select.select(connections, [], [])
except:
# Handle "Interrupted system call"
break

for sock in r:
if sock == s:
# New clients
sockfd, addr = s.accept()
connections.append(sockfd)
else:
data = sock.recv(4096)

if data == bytes(''):
# Disconnect
connections.remove(sock)

data = stack + data.decode('utf-8')

while True:
idx = data.find('\n')
if idx == -1:
stack = data
break
line = data[:idx]
data = data[idx + 1:]

if line == "END":
break
if isinstance(sock, int):
data = os.read(sock, 4096).decode('utf-8')
key = sock
else:
try:
check_queue.put(line, block=False)
except Full:
self.log.error('Queue full, dropping')
except (EOFError, IOError):
# Queue died
self.running = False
queue_state = False
data = sock.recv(4096).decode('utf-8')
key = sock.getpeername()

if data == '':
# Disconnect
del stack[key]
connections.remove(sock)
continue

if (
data[0] != '{' and
key in stack and stack[key] != ""
):
data = stack[key] + data.decode('utf-8')

while True:
idx = data.find('\n')
if idx == -1:
stack[key] = data
break
line = data[:idx]
data = data[idx + 1:]

# Stop backend
if queue_state:
try:
check_queue.put(None, block=False)
except:
pass

s.close()
self.log.info("Exit")
if line == "END":
break
else:
try:
check_queue.put(line, block=False)
except Full:
self.log.error('Queue full, dropping')
except (EOFError, IOError):
# Queue died
self.running = False
queue_state = False
break

# Stop backend
if queue_state:
try:
check_queue.put(None, block=False)
except:
pass

s.close()
self.log.info("Exit")
except:
self.log.critical("Fatal input error")
self.log.debug(traceback.format_exc())
check_queue.put(None, block=False)

def input_backend(self, check_queue):
if setproctitle:
Expand Down
1 change: 1 addition & 0 deletions src/tantale/livestatus/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
FIELDS_MAPPING = {
"state": "status",
"name": "hostname",
"alias": "hostname",
"host_name": "hostname",
"service_description": "check",
# No address here, only hostnames
Expand Down
3 changes: 2 additions & 1 deletion src/tantale/livestatus/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def parse(self, string):

return keepalive, Query(method, table, **options)
except:
raise Exception(
self.log.warning('Livestatus query parse error')
self.log.debug(
'Error %s\nparsing line "%s" on query "%s"'
% (traceback.format_exc(), line, repr(string)))
10 changes: 7 additions & 3 deletions src/tantale/livestatus/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,14 @@ def append(self, result):
# log table specific
elif self.table == 'log':
if field == 'type':
if result['check'] == 'Host':
mapped_res.append('HOST ALERT')
if 'check' in result:
if result['check'] == 'Host':
mapped_res.append('HOST ALERT')
else:
mapped_res.append('SERVICE ALERT')
else:
mapped_res.append('SERVICE ALERT')
self.log.debug("Malformed event line %s" % result)
mapped_res.append('HOST ALERT')
continue

# Append
Expand Down

0 comments on commit d235bde

Please sign in to comment.