Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions journal2gelf/__init__.py
Expand Up @@ -33,6 +33,12 @@ def main():
help="don't exclude fields, excluded by default")
parser.add_argument('-u', '--uppercase', action='store_true',
help="don't lower field names in output")
parser.add_argument('--message-json', action='store_true',
help="try to load json object from MESSAGE if found (may lead to key overlap)")
parser.add_argument('--convert-record',
help="custom python function like 'module.func_name' for record convertion")
parser.add_argument('--no-dup-underscore', action='store_true',
help="do not underscore journal _* fields second time (may lead to key overlap)")
parser.add_argument('--debug', action='store_true',
help="print GELF jsons to stdout")
parser.add_argument('--merge', action='store_true',
Expand All @@ -54,6 +60,16 @@ def main():
conv.debug = args.debug
conv.send = not args.dry_run
conv.lower = not args.uppercase
conv.message_json = args.message_json
conv.no_dup_underscore = args.no_dup_underscore
if args.convert_record:
# partly borrowed from from werkzeug.utils.import_string
module_name, obj_name = args.convert_record.rsplit('.', 1)
module = __import__(module_name, globals(), locals(), [obj_name])
try:
conv.convert_record = getattr(module, obj_name)
except AttributeError as e:
raise ImportError(e)

cursor = load_cursor()

Expand Down
125 changes: 68 additions & 57 deletions journal2gelf/converter.py
Expand Up @@ -11,17 +11,17 @@

log = logging.getLogger(__name__)
default_exclude_fields = frozenset([
b'__MONOTONIC_TIMESTAMP',
b'_MACHINE_ID',
b'__CURSOR',
b'_SYSTEMD_CGROUP',
b'_AUDIT_SESSION',
b'_CAP_EFFECTIVE',
b'_SYSTEMD_SLICE',
b'_AUDIT_LOGINUID',
b'_SYSTEMD_OWNER_UID',
b'_SOURCE_REALTIME_TIMESTAMP',
b'_SYSTEMD_SESSION',
'__MONOTONIC_TIMESTAMP',
'_MACHINE_ID',
'__CURSOR',
'_SYSTEMD_CGROUP',
'_AUDIT_SESSION',
'_CAP_EFFECTIVE',
'_SYSTEMD_SLICE',
'_AUDIT_LOGINUID',
'_SYSTEMD_OWNER_UID',
'_SOURCE_REALTIME_TIMESTAMP',
'_SYSTEMD_SESSION',
])


Expand All @@ -35,6 +35,9 @@ def __init__(self, host, port, exclude_fields=set(), default_excludes=True):
self.send = True
self.lower = True
self.cursor = None
self.message_json = False
self.no_dup_underscore = False
self.convert_record = convert_record

def run(self, merge=False, cursor=None):
j = Reader()
Expand All @@ -60,7 +63,9 @@ def run(self, merge=False, cursor=None):

for record in j:
self.cursor = record['__CURSOR']
record = convert_record(record, excludes=self.exclude_fields, lower=self.lower)
record = self.convert_record(
record, excludes=self.exclude_fields, lower=self.lower,
no_dup_underscore=self.no_dup_underscore, message_json=self.message_json)
if self.send:
self.gelf.log(**record)
if self.debug:
Expand All @@ -69,7 +74,8 @@ def run(self, merge=False, cursor=None):

# See https://www.graylog.org/resources/gelf-2/#specs
# And http://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html
def convert_record(src, excludes=set(), lower=True):
def convert_record(src, excludes=set(), lower=True, no_dup_underscore=False,
message_json=False):
for k, v in list(src.items()):
conv = field_converters.get(k)
if conv:
Expand All @@ -78,13 +84,19 @@ def convert_record(src, excludes=set(), lower=True):
except ValueError:
pass

if message_json and src.get('MESSAGE', b'').startswith(b'{"'):
try:
src.update({'_'+k: v for k, v in json.loads(src['MESSAGE'])}.items())
except json.JSONDecodeError:
pass

dst = {
b'version': b'1.1',
b'host': src.pop(b'_HOSTNAME', None),
b'short_message': src.pop(b'MESSAGE', None),
b'timestamp': src.pop(b'__REALTIME_TIMESTAMP', None),
b'level': src.pop(b'PRIORITY', None),
b'_facility': src.get(b'SYSLOG_IDENTIFIER') or src.get(b'_COMM')
'version': '1.1',
'host': src.pop('_HOSTNAME', None),
'short_message': src.pop('MESSAGE', b''),
'timestamp': src.pop('__REALTIME_TIMESTAMP', None),
'level': src.pop('PRIORITY', None),
'_facility': src.get('SYSLOG_IDENTIFIER') or src.get('_COMM')
}

for k, v in list(src.items()):
Expand All @@ -93,9 +105,11 @@ def convert_record(src, excludes=set(), lower=True):
if lower:
k = k.lower()
if k in system_fields:
k = b'_'+k
dst[b'_'+k] = v

k = '_'+k
if not no_dup_underscore or k[0] != '_':
dst['_'+k] = v
else:
dst[k] = v
return dst


Expand All @@ -104,45 +118,42 @@ def convert_timestamp(value):


def convert_monotonic_timestamp(value):
try:
return convert_timestamp(value[0])
except:
raise ValueError
return convert_timestamp(value[0])


field_converters = {
b'__MONOTONIC_TIMESTAMP': convert_monotonic_timestamp,
b'EXIT_STATUS': int,
b'_AUDIT_LOGINUID': int,
b'_PID': int,
b'COREDUMP_UID': int,
b'COREDUMP_SESSION': int,
b'SESSION_ID': int,
b'_SOURCE_REALTIME_TIMESTAMP': convert_timestamp,
b'_GID': int,
b'INITRD_USEC': int,
b'ERRNO': int,
b'SYSLOG_FACILITY': int,
b'__REALTIME_TIMESTAMP': convert_timestamp,
b'_SYSTEMD_SESSION': int,
b'_SYSTEMD_OWNER_UID': int,
b'COREDUMP_PID': int,
b'_AUDIT_SESSION': int,
b'USERSPACE_USEC': int,
b'PRIORITY': int,
b'KERNEL_USEC': int,
b'_UID': int,
b'SYSLOG_PID': int,
b'COREDUMP_SIGNAL': int,
b'COREDUMP_GID': int,
b'_SOURCE_MONOTONIC_TIMESTAMP': convert_monotonic_timestamp,
b'LEADER': int,
b'CODE_LINE': int
'__MONOTONIC_TIMESTAMP': convert_monotonic_timestamp,
'EXIT_STATUS': int,
'_AUDIT_LOGINUID': int,
'_PID': int,
'COREDUMP_UID': int,
'COREDUMP_SESSION': int,
'SESSION_ID': int,
'_SOURCE_REALTIME_TIMESTAMP': convert_timestamp,
'_GID': int,
'INITRD_USEC': int,
'ERRNO': int,
'SYSLOG_FACILITY': int,
'__REALTIME_TIMESTAMP': convert_timestamp,
'_SYSTEMD_SESSION': int,
'_SYSTEMD_OWNER_UID': int,
'COREDUMP_PID': int,
'_AUDIT_SESSION': int,
'USERSPACE_USEC': int,
'PRIORITY': int,
'KERNEL_USEC': int,
'_UID': int,
'SYSLOG_PID': int,
'COREDUMP_SIGNAL': int,
'COREDUMP_GID': int,
'_SOURCE_MONOTONIC_TIMESTAMP': convert_monotonic_timestamp,
'LEADER': int,
'CODE_LINE': int
}

system_fields = frozenset([
b'_id', # actually only _id and _uid are reserved in elasticsearch
b'_uid', # but for consistency we rename all this fields
b'_gid',
b'_pid',
'_id', # actually only _id and _uid are reserved in elasticsearch
'_uid', # but for consistency we rename all this fields
'_gid',
'_pid',
])
14 changes: 11 additions & 3 deletions journal2gelf/gelfclient.py
Expand Up @@ -15,6 +15,15 @@
log = logging.getLogger(__name__)


def safe_str(value):
if isinstance(value, bytes):
try:
return value.decode('utf-8')
except ValueError:
pass
return str(value)


# Based on https://github.com/orionvm/python-gelfclient
class UdpClient(object):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
Expand Down Expand Up @@ -51,12 +60,11 @@ def log(self, **message):
else:
message['host'] = self.source

message_json = json.dumps(message, separators=(',', ':'), ensure_ascii=False)
output = zlib.compress(message_json)
message_json = json.dumps(message, separators=(',', ':'), default=safe_str, ensure_ascii=False)
output = zlib.compress(message_json.encode('utf-8'))
if len(output) > self.mtu:
for chunk in self.chunks(output):
self.sock.sendto(chunk, self.sockaddr)
else:
self.sock.sendto(output, self.sockaddr)

return message