Skip to content

Commit

Permalink
Merge pull request #9539 from jcollie/logstash-v1
Browse files Browse the repository at this point in the history
Support for Logstash v1 event schema (and improved timestamp handling)
  • Loading branch information
s0undt3ch committed Jan 3, 2014
2 parents 6d9986f + d87a4be commit cd8ccb9
Showing 1 changed file with 108 additions and 24 deletions.
132 changes: 108 additions & 24 deletions salt/log/handlers/logstash_mod.py
Expand Up @@ -11,17 +11,18 @@
UDP Logging Handler
-------------------
In order to setup the datagram handler for `Logstash`_, please define on
the salt configuration file:
For versions of `Logstash`_ before 1.2.0:
In the salt configuration file:
.. code-block:: yaml
logstash_udp_handler:
host: 127.0.0.1
port: 9999
version: 0
On the `Logstash`_ configuration file you need something like:
In the `Logstash`_ configuration file:
.. code-block:: text
Expand All @@ -32,23 +33,45 @@
}
}
For version 1.2.0 of `Logstash`_ and newer:
In the salt configuration file:
.. code-block:: yaml
logstash_udp_handler:
host: 127.0.0.1
port: 9999
version: 1
In the `Logstash`_ configuration file:
.. code-block:: text
input {
udp {
port => 9999
codec => json
}
}
Please read the `UDP input`_ configuration page for additional information.
ZeroMQ Logging Handler
----------------------
In order to setup the ZMQ handler for `Logstash`_, please define on the
salt configuration file:
For versions of `Logstash`_ before 1.2.0:
In the salt configuration file:
.. code-block:: yaml
logstash_zmq_handler:
address: tcp://127.0.0.1:2021
version: 0
On the `Logstash`_ configuration file you need something like:
In the `Logstash`_ configuration file:
.. code-block:: text
Expand All @@ -63,6 +86,27 @@
}
}
For version 1.2.0 of `Logstash`_ and newer:
In the salt configuration file:
.. code-block:: yaml
logstash_zmq_handler:
address: tcp://127.0.0.1:2021
version: 1
In the `Logstash`_ configuration file:
.. code-block:: text
input {
zeromq {
topology => "pubsub"
address => "tcp://0.0.0.0:2021"
codec => json
}
}
Please read the `ZeroMQ input`_ configuration page for additional
information.
Expand Down Expand Up @@ -123,13 +167,6 @@
from salt.log.setup import LOG_LEVELS
from salt.log.mixins import NewStyleClassMixIn

# Import 3rd-party libs
try:
from pytz import utc as _UTC
HAS_PYTZ = True
except ImportError:
HAS_PYTZ = False

log = logging.getLogger(__name__)

# Define the module's virtual name
Expand All @@ -151,18 +188,20 @@ def __virtual__():

def setup_handlers():
host = port = address = None
logstash_formatter = LogstashFormatter()

if 'logstash_udp_handler' in __opts__:
host = __opts__['logstash_udp_handler'].get('host', None)
port = __opts__['logstash_udp_handler'].get('port', None)
version = __opts__['logstash_udp_handler'].get('version', 0)

if host is None and port is None:
log.debug(
'The required \'logstash_udp_handler\' configuration keys, '
'\'host\' and/or \'port\', are not properly configured. Not '
'configuring the logstash UDP logging handler.'
)
else:
logstash_formatter = LogstashFormatter(version = version)
udp_handler = DatagramLogstashHandler(host, port)
udp_handler.setFormatter(logstash_formatter)
udp_handler.setLevel(
Expand All @@ -184,6 +223,7 @@ def setup_handlers():
if 'logstash_zmq_handler' in __opts__:
address = __opts__['logstash_zmq_handler'].get('address', None)
zmq_hwm = __opts__['logstash_zmq_handler'].get('hwm', 1000)
version = __opts__['logstash_zmq_handler'].get('version', 0)

if address is None:
log.debug(
Expand All @@ -192,6 +232,7 @@ def setup_handlers():
'configuring the logstash ZMQ logging handler.'
)
else:
logstash_formatter = LogstashFormatter(version = version)
zmq_handler = ZMQLogstashHander(address, zmq_hwm=zmq_hwm)
zmq_handler.setFormatter(logstash_formatter)
zmq_handler.setLevel(
Expand All @@ -215,20 +256,20 @@ def setup_handlers():


class LogstashFormatter(logging.Formatter, NewStyleClassMixIn):
def __init__(self, msg_type='logstash', msg_path='logstash'):
def __init__(self, msg_type='logstash', msg_path='logstash', version=0):
self.msg_path = msg_path
self.msg_type = msg_type
self.version = version
self.format = getattr(self, 'format_v{0}'.format(version))
super(LogstashFormatter, self).__init__(fmt=None, datefmt=None)

def formatTime(self, record, datefmt=None):
timestamp = datetime.datetime.utcfromtimestamp(record.created)
if HAS_PYTZ:
return _UTC.localize(timestamp).isoformat()
return '{0}+00:00'.format(timestamp.isoformat())
return datetime.datetime.utcfromtimestamp(record.created).isoformat()[:-3] + 'Z'

def format(self, record):
def format_v0(self, record):
host = socket.getfqdn()
message_dict = {
'@timestamp': self.formatTime(record),
'@fields': {
'levelname': record.levelname,
'logger': record.name,
Expand All @@ -247,8 +288,7 @@ def format(self, record):
),
'@source_host': host,
'@source_path': self.msg_path,
'@tags': [],
'@timestamp': self.formatTime(record),
'@tags': ['salt'],
'@type': self.msg_type,
}

Expand Down Expand Up @@ -278,6 +318,50 @@ def format(self, record):
message_dict['@fields'][key] = repr(value)
return json.dumps(message_dict)

def format_v1(self, record):
message_dict = {
'@version': 1,
'@timestamp': self.formatTime(record),
'host': socket.getfqdn(),
'levelname': record.levelname,
'logger': record.name,
'lineno': record.lineno,
'pathname': record.pathname,
'process': record.process,
'threadName': record.threadName,
'funcName': record.funcName,
'processName': record.processName,
'message': record.getMessage(),
'tags': ['salt'],
'type': self.msg_type
}

if record.exc_info:
message_dict['exc_info'] = self.formatException(
record.exc_info
)

# Add any extra attributes to the message field
for key, value in record.__dict__.items():
if key in ('args', 'asctime', 'created', 'exc_info', 'exc_text',
'filename', 'funcName', 'id', 'levelname', 'levelno',
'lineno', 'module', 'msecs', 'msecs', 'message', 'msg',
'name', 'pathname', 'process', 'processName',
'relativeCreated', 'thread', 'threadName'):
# These are already handled above or not handled at all
continue

if value is None:
message_dict[key] = value
continue

if isinstance(value, (string_types, bool, dict, float, int, list)):
message_dict[key] = value
continue

message_dict[key] = repr(value)
return json.dumps(message_dict)


class DatagramLogstashHandler(logging.handlers.DatagramHandler):
'''
Expand Down

0 comments on commit cd8ccb9

Please sign in to comment.