Skip to content
Browse files

implement new logstash format

  • Loading branch information...
1 parent b317762 commit 21d6d452fbc6be320c39c26805485697a8f9d7a3 @sebest committed Feb 6, 2014
Showing with 171 additions and 126 deletions.
  1. +142 −113 pystash/formatter.py
  2. +22 −10 pystash/handlers.py
  3. +7 −3 pystash/server.py
View
255 pystash/formatter.py
@@ -1,120 +1,149 @@
-'''
-This library is provided to allow standard python
-logging to output log data as JSON formatted strings
-ready to be shipped out to logstash.
-
-https://github.com/exoscale/python-logstash-formatter
-'''
+# https://github.com/vklochan/python-logstash/blob/master/logstash/formatter.py
+import traceback
import logging
import socket
-import datetime
-import traceback as tb
-import json
-
-def _default_json_default(obj):
- """
- Coerce everything to strings.
- All objects representing time get output as ISO8601.
- """
- if isinstance(obj, datetime.datetime) or \
- isinstance(obj,datetime.date) or \
- isinstance(obj,datetime.time):
- return obj.isoformat()
- else:
- return str(obj)
-
-class LogstashFormatter(logging.Formatter):
- """
- A custom formatter to prepare logs to be
- shipped out to logstash.
- """
-
- def __init__(self,
- fmt=None,
- datefmt=None,
- json_cls=None,
- json_default=_default_json_default):
- """
- :param fmt: Config as a JSON string, allowed fields;
- extra: provide extra fields always present in logs
- source_host: override source host name
- :param datefmt: Date format to use (required by logging.Formatter
- interface but not used)
- :param json_cls: JSON encoder to forward to json.dumps
- :param json_default: Default JSON representation for unknown types,
- by default coerce everything to a string
- """
-
- if fmt is not None:
- self._fmt = json.loads(fmt)
+import sys
+from datetime import datetime
+try:
+ import json
+except ImportError:
+ import simplejson as json
+
+
+class LogstashFormatterBase(logging.Formatter):
+ def __init__(self, message_type='Logstash', tags=None, fqdn=False):
+ self.message_type = message_type
+ self.tags = tags if tags is not None else []
+ self.extra_tags = []
+
+ if fqdn:
+ self.host = socket.getfqdn()
else:
- self._fmt = {}
- self.json_default = json_default
- self.json_cls = json_cls
- if 'extra' not in self._fmt:
- self.defaults = {}
+ self.host = socket.gethostname()
+
+ def get_extra_fields(self, record):
+ # The list contains all the attributes listed in
+ # http://docs.python.org/library/logging.html#logrecord-attributes
+ skip_list = (
+ 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
+ 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',
+ 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
+ 'processName', 'relativeCreated', 'thread', 'threadName', 'extra')
+
+ if sys.version_info < (3, 0):
+ easy_types = (basestring, bool, dict, float, int, list, type(None))
else:
- self.defaults = self._fmt['extra']
- if 'source_host' in self._fmt:
- self.source_host = self._fmt['source_host']
+ easy_types = (str, bool, dict, float, int, list, type(None))
+
+ fields = {}
+
+ self.extra_tags = []
+ for key, value in record.__dict__.items():
+ if key not in skip_list:
+ if key == 'tags' and isinstance(value, list):
+ self.extra_tags = value
+ elif isinstance(value, easy_types):
+ fields[key] = value
+ else:
+ fields[key] = repr(value)
+
+ return fields
+
+ def get_debug_fields(self, record):
+ fields = {
+ 'exc_info': self.format_exception(record.exc_info),
+ 'lineno': record.lineno,
+ 'process': record.process,
+ 'threadName': record.threadName,
+ }
+
+ # funcName was added in 2.5
+ if not getattr(record, 'funcName', None):
+ fields['funcName'] = record.funcName
+
+ # processName was added in 2.6
+ if not getattr(record, 'processName', None):
+ fields['processName'] = record.processName
+
+ return fields
+
+ @classmethod
+ def format_source(cls, message_type, host, path):
+ return "%s://%s/%s" % (message_type, host, path)
+
+ @classmethod
+ def format_timestamp(cls, time):
+ return datetime.utcfromtimestamp(time).isoformat() + 'Z'
+
+ @classmethod
+ def format_exception(cls, exc_info):
+ return ''.join(traceback.format_exception(*exc_info)) if exc_info else ''
+
+ @classmethod
+ def serialize(cls, message):
+ if sys.version_info < (3, 0):
+ return json.dumps(message)
else:
- try:
- self.source_host = socket.getfqdn()
- except:
- self.source_host = ""
+ return bytes(json.dumps(message), 'utf-8')
+
+class LogstashFormatterVersion0(LogstashFormatterBase):
+ version = 0
def format(self, record):
- """
- Format a log record to JSON, if the message is a dict
- assume an empty message and use the dict as additional
- fields.
- """
-
- fields = record.__dict__.copy()
-
- if isinstance(record.msg, dict):
- fields.update(record.msg)
- fields.pop('msg')
- msg = ""
- else:
- msg = record.getMessage()
-
- if 'msg' in fields:
- fields.pop('msg')
-
- if 'exc_info' in fields:
- if fields['exc_info']:
- formatted = tb.format_exception(*fields['exc_info'])
- fields['exception'] = formatted
- fields.pop('exc_info')
-
- if 'exc_text' in fields and not fields['exc_text']:
- fields.pop('exc_text')
-
- logr = self.defaults.copy()
-
- logr.update({'@message': msg,
- '@timestamp': datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
- '@source_host': self.source_host,
- '@fields': self._build_fields(logr, fields)})
-
- return json.dumps(logr, default=self.json_default, cls=self.json_cls)
-
- def _build_fields(self, defaults, fields):
- """Return provided fields including any in defaults
-
- >>> f = LogstashFormatter()
- # Verify that ``fields`` is used
- >>> f._build_fields({}, {'foo': 'one'}) == \
- {'foo': 'one'}
- True
- # Verify that ``@fields`` in ``defaults`` is used
- >>> f._build_fields({'@fields': {'bar': 'two'}}, {'foo': 'one'}) == \
- {'foo': 'one', 'bar': 'two'}
- True
- # Verify that ``fields`` takes precedence
- >>> f._build_fields({'@fields': {'foo': 'two'}}, {'foo': 'one'}) == \
- {'foo': 'one'}
- True
- """
- return dict(defaults.get('@fields', {}).items() + fields.items())
+ # Create message dict
+ message = {
+ '@timestamp': self.format_timestamp(record.created),
+ '@message': record.getMessage(),
+ '@source': self.format_source(self.message_type, self.host,
+ record.pathname),
+ '@source_host': self.host,
+ '@source_path': record.pathname,
+ '@tags': self.tags[:],
+ '@type': self.message_type,
+ '@fields': {
+ 'levelname': record.levelname,
+ 'logger': record.name,
+ },
+ }
+
+ # Add extra fields
+ message['@fields'].update(self.get_extra_fields(record))
+
+ if self.extra_tags:
+ message['@tags'].extend(self.extra_tags)
+
+ # If exception, add debug info
+ if record.exc_info:
+ message['@fields'].update(self.get_debug_fields(record))
+
+ return self.serialize(message)
+
+
+class LogstashFormatterVersion1(LogstashFormatterBase):
+ def format(self, record):
+ # Create message dict
+ message = {
+ '@timestamp': self.format_timestamp(record.created),
+ '@version': 1,
+ 'message': record.getMessage(),
+ 'host': self.host,
+ 'path': record.pathname,
+ 'tags': self.tags[:],
+ 'type': self.message_type,
+
+ # Extra Fields
+ 'levelname': record.levelname,
+ 'logger': record.name,
+ }
+
+ # Add extra fields
+ message.update(self.get_extra_fields(record))
+
+ if self.extra_tags:
+ message['tags'].extend(self.extra_tags)
+
+ # If exception, add debug info
+ if record.exc_info:
+ message.update(self.get_debug_fields(record))
+
+ return self.serialize(message)
View
32 pystash/handlers.py
@@ -1,18 +1,33 @@
import redis
from logging import Handler
from logging.handlers import DatagramHandler, SocketHandler
+from . import formatter
class LogstashTCPHandler(SocketHandler):
+ def __init__(self, host, port=5959, message_type='logstash', fqdn=False, version=1):
+ SocketHandler.__init__(self, host, port)
+ if version == 1:
+ self.formatter = formatter.LogstashFormatterVersion1(message_type, [], fqdn)
+ else:
+ self.formatter = formatter.LogstashFormatterVersion0(message_type, [], fqdn)
+
def makePickle(self, record):
- return self.format(record)
+ return self.formatter.format(record)
class LogstashUDPHandler(DatagramHandler):
+ def __init__(self, host, port=5959, message_type='logstash', fqdn=False, version=1):
+ DatagramHandler.__init__(self, host, port)
+ if version == 1:
+ self.formatter = formatter.LogstashFormatterVersion1(message_type, [], fqdn)
+ else:
+ self.formatter = formatter.LogstashFormatterVersion0(message_type, [], fqdn)
+
def makePickle(self, record):
- return self.format(record)
+ return self.formatter.format(record)
class RedisHandler(Handler):
@@ -21,10 +36,14 @@ def __init__(self, host='localhost', port=6379, db=0, key='logstash'):
Handler.__init__(self)
self._key = key
self.r_server = redis.Redis(host)
+ if version == 1:
+ self.formatter = formatter.LogstashFormatterVersion1(message_type, [], fqdn)
+ else:
+ self.formatter = formatter.LogstashFormatterVersion0(message_type, [], fqdn)
def emit(self, record):
try:
- self.r_server.rpush(self._key, self.format(record))
+ self.r_server.rpush(self._key, self.formatter.format(record))
except (KeyboardInterrupt, SystemExit):
raise
except:
@@ -33,7 +52,6 @@ def emit(self, record):
if __name__ == '__main__':
import logging
- from .formatter import LogstashFormatter
lsip = '127.0.0.1'
@@ -42,12 +60,6 @@ def emit(self, record):
handler_udp = LogstashUDPHandler(lsip, 9021)
handler_redis = RedisHandler(host=lsip)
- formatter = LogstashFormatter()
-
- handler_tcp.setFormatter(formatter)
- handler_udp.setFormatter(formatter)
- handler_redis.setFormatter(formatter)
-
logger.addHandler(handler_tcp)
logger.addHandler(handler_udp)
logger.addHandler(handler_redis)
View
10 pystash/server.py
@@ -10,7 +10,7 @@
import logging.handlers
import gevent
from gevent.server import DatagramServer, StreamServer
-from .formatter import LogstashFormatter
+from . import formatter
DEFAULT_UDP = logging.handlers.DEFAULT_UDP_LOGGING_PORT
DEFAULT_TCP = logging.handlers.DEFAULT_TCP_LOGGING_PORT
@@ -21,17 +21,21 @@
class Server(object):
- def __init__(self, bind_ip='127.0.0.1', tcp_port=DEFAULT_TCP, udp_port=DEFAULT_UDP, redis_host='localhost', redis_port=6379, redis_queue='logstash'):
+ def __init__(self, bind_ip='127.0.0.1', tcp_port=DEFAULT_TCP, udp_port=DEFAULT_UDP, redis_host='localhost', redis_port=6379, redis_queue='logstash', message_type='logstash', fqdn=True, version=1):
self.redis = redis.Redis(redis_host, redis_port)
self.redis_queue = redis_queue
- self.formatter = LogstashFormatter()
+ if version == 1:
+ self.formatter = formatter.LogstashFormatterVersion1(message_type, [], fqdn)
+ else:
+ self.formatter = formatter.LogstashFormatterVersion0(message_type, [], fqdn)
self.udp_server = DatagramServer('%s:%s' % (bind_ip, udp_port), self.udp_handle)
self.tcp_server = StreamServer('%s:%s' % (bind_ip, tcp_port), self.tcp_handle)
logging.info('Listening on %s (udp=%s tcp=%s) sending to %s:%s.', bind_ip, udp_port, tcp_port, redis_host, redis_port)
def obj_to_redis(self, obj):
record = logging.makeLogRecord(obj)
payload = self.formatter.format(record)
+ logger.debug('message %s', payload)
self.redis.rpush(self.redis_queue, payload)
def udp_handle(self, data, address):

0 comments on commit 21d6d45

Please sign in to comment.
Something went wrong with that request. Please try again.