Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Tree: 6b02085400
Fetching contributors…

Cannot retrieve contributors at this time

executable file 643 lines (526 sloc) 22.108 kB
#!/usr/bin/env python
"""
ratchet-agent: agent to monitor log files and send notices to Ratchet.io
"""
import codecs
import ConfigParser
import copy
import fnmatch
import hashlib
import json
import logging
import optparse # instead of argparse, for python2.6 compat
import os
import re
import shelve
import signal
import socket
import stat
import sys
import threading
import time
import requests
log = logging.getLogger(__name__)
VERSION = '0.2.2'
DEFAULT_ENDPOINT = 'https://submit.ratchet.io/api/1/item/'
DEFAULT_TIMEOUT = 3 # in seconds
DEFAULT_DATEFMT = '%Y-%m-%d %H:%M:%S,%f'
LOG_LEVEL = {
'notset': 0,
'notse': 0,
'debug': 10,
'info': 20,
'warning': 30,
'warni': 30,
'error': 40,
'critical': 50,
'criti': 50
}
## utils
def parse_timestamp(format, s):
try:
ts = time.mktime(time.strptime(s, format))
except ValueError:
# fall back to current timestamp
ts = time.time()
return int(ts)
def find_filenames(app_config):
"""
Returns a list of all the filenames to process from the specified app
"""
filenames = []
for target in app_config['targets']:
if os.path.isfile(target):
if should_process_file(app_config, target):
filenames.append(target)
elif os.path.isdir(target):
for relative_name in os.listdir(target):
filename = os.path.join(target, relative_name)
if os.path.isfile(filename) and should_process_file(app_config, filename):
filenames.append(filename)
return filenames
def should_process_file(app_config, filename):
"""
Returns True if we should process the specified filename for the specified app
"""
if not os.path.isfile(filename):
return False
if filename in app_config['blacklist']:
return False
if filename in app_config['targets']:
return True
ext = filename.split('.')[-1]
if app_config['ext_whitelist']:
return ext in app_config['ext_whitelist']
if ext in app_config['ext_blacklist']:
return False
return True
def build_python_log_format_parser(format, datefmt):
"""
Parses a python log format string, and returns a (regex, strptimeformat) to parse it.
"""
if not datefmt:
datefmt = DEFAULT_DATEFMT
python_name = r'[a-zA-Z_][a-zA-Z0-9_]*'
# dict of key: (match_name, regex_string)
known_keys = {
'%(asctime)s': ('timestamp', datefmt_to_regex(datefmt)),
'%(created)f': ('created', r'\d+\.\d+'),
'%(filename)s': ('filename', r'\S+'),
'%(funcName)s': ('function_name', python_name),
'%(levelname)s': ('level', r'[a-zA-Z_]+'),
'%(levelname)-5.5s': ('level', r'[a-zA-Z_][a-zA-Z_ ]{4}'),
'%(levelname)-8s': ('level', r'[a-zA-Z_]([a-zA-Z_ ]{7}|[a-zA-Z_]{8,})'),
'%(levelno)d': ('levelno', r'\d+'),
'%(lineno)d': ('lineno', r'\d+'),
'%(lineno)s': ('lineno', r'\d+'),
'%(module)s': ('module_name', python_name),
'%(message)s': ('title', '.*'),
'%(name)s': ('name', '\S+'),
'%(pathname)s': ('pathname', '\S+'),
'%(process)d': ('pid', '\d+'),
'%(processName)s': ('process_name', '\S+'),
'%(relativeCreated)d': ('relative_created', '\d+'),
'%(thread)d': ('thread', '\d+'),
'%(threadName)s': ('thread_name', '\S+'),
'%(threadName)-20s': ('thread_name', '\S+ {,19}'),
}
regex_str = '^' + re.escape(format) + '$'
for key, val in known_keys.iteritems():
search = re.escape(key)
replacement = "(?P<%s>%s)" % val
regex_str = regex_str.replace(search, replacement)
log.debug("Built regex string from format %s datefmt %s => %s", format, datefmt, regex_str)
return (re.compile(regex_str), datefmt)
def datefmt_to_regex(datefmt):
# the below should work for en-us locale... probably won't work for all locales.
replacements = {
'%a': r'[a-zA-Z]+', # locale's abbreviated weekday name
'%A': r'[a-zA-Z]+', # locale's full weekday name
'%b': r'[a-zA-Z]+', # locale's abbreviated month name
'%B': r'[a-zA-Z]+', # locale's full month name
'%c': r'[a-zA-Z]+ [a-zA-Z]+ \d{1,2} \d{2}:\d{2}:\d{2} \d{4}', # locale's appropriate date and time representation.
'%d': r'\d{2}', # day of the month [01,31]
'%f': r'\d+', # milliseconds
'%H': r'\d{2}', # hour (24-hour clock) [00,23]
'%I': r'\d{2}', # hour (12-hour clock) [01,12]
'%j': r'\d{3}', # day of the year as a decimal number [001,366]
'%m': r'\d{2}', # month [01,12]
'%M': r'\d{2}', # minute [00,59]
'%p': r'[A-Z]{2}', # locale's equivalent of either AM or PM
'%S': r'\d{2}', # second [00,61]
'%U': r'\d{2}', # week number of the year starting on Sundays [00,53]
'%w': r'\d', # weekday number [0,6]
'%W': r'\d{2}', # week number of the year starting on Mondays [00,53]
'%x': r'\d{2}/\d{2}/\d{2}', # locale's date representation
'%X': r'\d{2}:\d{2}:\d{2}', # locale's time representation
'%y': r'\d{2}', # year without century [00,99]
'%Y': r'\d{4}', # year with century
'%Z': r'[A-Z]{3}', # time zone name
}
for key, val in replacements.iteritems():
datefmt = datefmt.replace(key, val)
# last: replace %% with %
datefmt = datefmt.replace('%%', '%')
return datefmt
## processors
def choose_processor(filename):
"""
Returns the Processor subclass that should be used to process the specified file
"""
if filename.endswith('.ratchet'):
return RatchetFileProcessor
return LogFileProcessor
class Processor(object):
"""
Base Processor class
"""
def __init__(self, scanner, app):
self.scanner = scanner
self.app = app
def process(self, fp):
raise NotImplementedError()
def send_payload(self, payload):
# do immediate http post
# in the future, will do this with batches and single separate thread
if options.dry_run:
log.debug("Dry run; payload to send: %s", payload)
else:
log.debug("Sending payload: %s", payload)
config = self.app['config']
resp = requests.post(config['endpoint'], data=payload, timeout=config['timeout'])
if resp.status_code != 200:
log.warning("Unexpected response from Ratchet API. Code: %s Body: %s",
resp.status_code, resp.text)
class RatchetFileProcessor(Processor):
"""
Processor for .ratchet files
Each line is a json-encoded payload, so all we have to do is decode it and send it.
"""
def process(self, fp, filename, state):
for line in fp:
log.debug("Read line. Length: %d Hash: %s", len(line), hashlib.md5(line).hexdigest())
self._process_line(line)
def _process_line(self, line):
line = line.strip()
if not line:
log.debug("Skipping empty line")
return
try:
payload = json.loads(line)
except ValueError:
log.warning("Could not process badly formatted line: %s", line)
return
self.send_payload(payload)
class LogFileProcessor(Processor):
"""
Processor for general log files - currently works reasonably well for paste/pylons log files.
Some events we want will span multiple lines.
"""
# tuple of (pattern, strptime format)
_default_message_start_parser = (re.compile(r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),(\d+)?\s+(?P<level>[A-Z]+)\s+(?P<title>.*$)'), '%Y-%m-%d %H:%M:%S')
def __init__(self, *args, **kw):
super(LogFileProcessor, self).__init__(*args, **kw)
self._init_formatters()
def _init_formatters(self):
default_pattern_name = self.app['config']['log_format.default']
self.default_parser = self.scanner.config['_formats'].get(default_pattern_name,
self._default_message_start_parser)
# ordered list of tuples, in descending order of priority.
self.pattern_parsers = []
patterns_config = self.app['config']['log_format.patterns'].split('\n')
for pair in patterns_config:
pair = pair.strip()
if not pair:
continue
pattern, format = pair.split(" ", 1)
if format in self.scanner.config['_formats']:
pattern = re.compile(fnmatch.translate(pattern))
self.pattern_parsers.append((pattern, self.scanner.config['_formats'][format]))
def process(self, fp, filename, state):
empty_message = {'data': [], 'timestamp': None, 'level': None, 'title': None}
current_message = state.get('current_message', copy.deepcopy(empty_message))
# if this filename matches any of our parsers, use the appropriate parser.
# otherwise, use the default.
parser = self.default_parser
for pattern, _parser in self.pattern_parsers:
if pattern.search(filename):
parser = _parser
break
log.debug("File %s using parser %s", filename, parser['name'])
for line in fp:
# does this look like the beginning of a new log message?
match = parser['regex'].match(line)
if match:
if current_message['data']:
# done with the previous item - send it off and clear data
self._process_message(current_message, filename)
current_message = copy.deepcopy(empty_message)
# save interesting data from first line
current_message['timestamp'] = parse_timestamp(parser['datefmt'],
match.group('timestamp').strip())
current_message['level'] = match.group('level').strip()
current_message['title'] = match.group('title').strip()
if 'thread_name' in match.groupdict():
current_message['thread_name'] = match.group('thread_name').strip()
current_message['data'].append(line)
if self.scanner.scan_start_time - state['mtime'] > 1:
# it's been at least 1 second since anything was written to the file
# if there's a pending message, send it
if current_message['data']:
self._process_message(current_message, filename)
current_message = copy.deepcopy(empty_message)
state['current_message'] = current_message
def _process_message(self, message, filename):
# if the level is below our threshold, ignore it
level_string = (message.get('level') or '').lower()
level = LOG_LEVEL.get(level_string, 0)
min_level = self.app['config']['min_log_level']
if level < min_level:
log.debug("Skipping message; level: %s min_level: %d", level_string, min_level)
return
payload = self._build_payload(message, filename)
self.send_payload(payload)
def _build_payload(self, message, filename):
message_data = "".join(message['data'])
log.debug("Building message payload. Timestamp: %s Level: %s Title: %s Data: %s",
message['timestamp'], message['level'], message['title'], message_data)
app_config = self.app['config']
# basic params
data = {
'timestamp': message['timestamp'],
'environment': app_config['params.environment'],
'level': message['level'].lower(),
'notifier': {
'name': 'ratchet-agent',
'ratchet_agent_app': self.app['name'],
'version': VERSION
}
}
if app_config.get('params.language'):
data['language'] = app_config['params.language']
if app_config.get('params.framework'):
data['framework'] = app_config['params.framework']
# message body
data['body'] = {
'message': {
'title': message['title'],
'body': message_data
}
}
# server environment
data['server'] = {
'log_file': filename,
'host': self.app['host'],
'branch': app_config['params.branch'],
'root': app_config['params.root'],
}
payload = {
'access_token': self.app['config']['params.access_token'],
'data': data
}
return json.dumps(payload)
## main thread and loop
class ScannerThread(threading.Thread):
"""
The main 'scanner' thread - scans files and posts items to the ratchet.io api.
There should only be a single instance of this thread.
"""
def __init__(self, stop_event, config):
super(ScannerThread, self).__init__()
self.stop_event = stop_event
self.config = config
self.apps = {}
for app_name, app_config in config.iteritems():
if app_name.startswith('_'):
continue
self.apps[app_name] = {
'name': app_name,
'config': app_config,
'host': socket.gethostname(),
}
def run(self):
sleep_seconds = self.config['_global']['sleep_time']
while not self.stop_event.is_set():
log.info("scanner thread looping...")
start_time = time.time()
try:
self.scan_all()
except:
log.exception("Caught exception in ScannerThread.run() loop")
if options.dry_run:
break
# sleep for at most sleep_seconds seconds.
wait_time = (start_time + sleep_seconds) - time.time()
if wait_time > 0:
self.stop_event.wait(wait_time)
def scan_all(self):
# we keep state in a dictionary like:
# {'files': {'filename1': {'pos': 12345, 'inode': 4567}, ...}}
self.scan_start_time = time.time()
state = self.load_state()
apps_state = state.get('apps', {})
for app in self.apps.itervalues():
self.scan_app(app, apps_state)
# we've been mutating apps_state, but the shelf doesn't know about it.
# assign back, then close, to persist the changes.
state['apps'] = apps_state
self.save_state(state)
def load_state(self):
if not options.dry_run:
return shelve.open(self.config['_global']['statefile'])
else:
return {}
def save_state(self, state):
if not options.dry_run:
state.close()
def scan_app(self, app, apps_state):
log.debug("Scanning app %s", app['name'])
app_state = apps_state.setdefault(app['name'], {})
files_state = app_state.setdefault('files', {})
filenames = find_filenames(app['config'])
for filename in filenames:
log.debug("Processing file %s", filename)
stats = os.stat(filename)
inode = stats[stat.ST_INO]
mtime = stats[stat.ST_MTIME]
if filename in files_state:
# filename we've seen before.
if inode != files_state[filename]['inode']:
# file has been rotated. reset to position 0 and store new inode.
log.debug("File %s appears to have been rotated.", filename)
files_state[filename] = {'pos': 0, 'inode': inode, 'mtime': mtime}
else:
# new file - initialize
log.debug("Initializing new file %s", filename)
files_state[filename] = {'pos': 0, 'inode': inode, 'mtime': mtime}
self.scan_file(app, filename, files_state[filename])
log.debug("Completed scan of app %s", app['name'])
def scan_file(self, app, filename, file_state):
log.debug("Scanning file %s", filename)
processor = choose_processor(filename)(self, app)
with codecs.open(filename, 'r', encoding='utf-8', errors='replace') as fp:
pos = file_state['pos']
log.debug("File %s seeking to pos %d", filename, pos)
fp.seek(pos)
processor.process(fp, filename, file_state)
new_pos = fp.tell()
log.debug("File %s new pos %d", filename, new_pos)
file_state['pos'] = new_pos
def register_signal_handlers(stop_event):
def signal_handler(signum, frame):
log.info("Shutting down...")
stop_event.set()
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGALRM):
signal.signal(sig, signal_handler)
def main_loop():
stop_event = threading.Event()
register_signal_handlers(stop_event)
scanner = ScannerThread(stop_event, config)
scanner.start()
# sleep until the thread is killed
# have to sleep in a loop, instead of worker.join(), otherwise we'll never get the signals
while scanner.isAlive():
time.sleep(1)
log.info("Shutdown complete")
## configuration
def build_option_parser():
parser = optparse.OptionParser()
parser.add_option('-c', '--config', dest='config_file', action='store',
default='ratchet-agent.conf', help='Path to configuration file. Default: ratchet-agent.conf in the working directory.')
parser.add_option('--dry-run', dest='dry_run', action='store_true', default=False,
help='Dry run: processes log files, but does not save state or submit events to Ratchet. Exits after processing once.')
# verbosity
verbosity = optparse.OptionGroup(parser, 'Verbosity')
verbosity.add_option('-v', '--verbose', dest='verbose', action='store_true', default=False,
help='Verbose output (uses log level DEBUG)')
verbosity.add_option('-q', '--quiet', dest='quiet', action='store_true', default=False,
help='Quiet output (uses log level WARNING)')
parser.add_option_group(verbosity)
return parser
def parse_config(filename):
defaults = {
'statefile': '/var/cache/ratchet-agent.state',
'sleep_time': '10',
'endpoint': DEFAULT_ENDPOINT,
'timeout': str(DEFAULT_TIMEOUT),
'root': os.getcwd(),
'ext_whitelist': 'log, ratchet',
'ext_blacklist': '',
'targets': '',
'blacklist': '',
'log_format.default': '',
'log_format.patterns': '',
}
def to_int(val):
return int(val)
def to_list(val):
return re.split(r'\s+', val)
def to_log_level(val):
return LOG_LEVEL[val.lower()]
parsers = {
'sleep_time': to_int,
'timeout': to_int,
'ext_whitelist': to_list,
'ext_blacklist': to_list,
'targets': to_list,
'blacklist': to_list,
'max_file_page': to_int,
'min_log_level': to_log_level,
}
cp = ConfigParser.SafeConfigParser(defaults)
cp.read([filename])
config = {'_formats': {}}
for section_name in cp.sections():
if section_name.startswith('app:'):
app_name = section_name[len('app:'):]
app = {'name': app_name}
for option_name, raw_value in cp.items(section_name):
if option_name in parsers:
value = parsers[option_name](raw_value)
else:
value = raw_value
app[option_name] = value
config[app_name] = app
elif section_name.startswith('format:'):
format_name = section_name[len('format:'):]
format = {'name': format_name}
format_type = cp.get(section_name, 'type')
format_spec = cp.get(section_name, 'format', True)
try:
format_datefmt = cp.get(section_name, 'datefmt', True)
except ConfigParser.NoOptionError:
format_datefmt = DEFAULT_DATEFMT
if format_type != 'python':
log.warning("Unrecognized format type: %s", format_type)
continue
regex, datefmt = build_python_log_format_parser(format_spec, format_datefmt)
format['regex'] = regex
format['datefmt'] = datefmt
config['_formats'][format_name] = format
global_config = cp.defaults()
config['_global'] = {}
for option_name, raw_value in global_config.iteritems():
if option_name in parsers:
value = parsers[option_name](raw_value)
else:
value = raw_value
config['_global'][option_name] = value
return config
def validate_config(config):
errors = []
required_vars = ['params.access_token', 'targets']
for app_name, app_config in config.iteritems():
if app_name.startswith('_'):
continue
for var_name in required_vars:
if not app_config.get('params.access_token'):
errors.append("app:%s is missing required var %s" % (app_name, var_name))
if errors:
print "CONFIGURATION ERRORS"
for error in errors:
print error
print
sys.exit(1)
if __name__ == '__main__':
# first parse command-line options to get the path to the config file
parser = build_option_parser()
(options, args) = parser.parse_args()
# now parse the config file
config = parse_config(options.config_file)
# validate - will exit if invalid
validate_config(config)
# set up logging
level = logging.INFO
if options.verbose:
level = logging.DEBUG
elif options.quiet:
level = logging.WARNING
formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(level)
log.addHandler(handler)
log.setLevel(level)
# start main loop
main_loop()
Jump to Line
Something went wrong with that request. Please try again.