Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat struct log #2179

Merged
merged 28 commits into from May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d0307ea
Log storm to its own logger.
vEpiphyte May 3, 2021
3c6bc46
Structlog POC
vEpiphyte May 4, 2021
022813c
Correct filename
vEpiphyte May 4, 2021
431577a
Merge branch 'master' into feat_struct_log
vEpiphyte May 5, 2021
5d18ada
Add structlog test
vEpiphyte May 5, 2021
3330a18
Tweak test
vEpiphyte May 5, 2021
3b46c68
Fix tests, skip a test
vEpiphyte May 5, 2021
6fb200b
Branch build.
vEpiphyte May 5, 2021
617e10f
Use envbool
vEpiphyte May 5, 2021
1553ffd
Fix envbool
vEpiphyte May 5, 2021
30eb24b
Tweak attribute name for datadog
vEpiphyte May 6, 2021
419209a
Merge branch 'master' into feat_struct_log
vEpiphyte May 6, 2021
edeedbb
Fix unit test
vEpiphyte May 6, 2021
cad1474
Add unicode test, remove dead code.
vEpiphyte May 6, 2021
aa68b64
Use s_common.err()
vEpiphyte May 7, 2021
3dabd69
Protect against unserializable types.
vEpiphyte May 7, 2021
f9348d3
Nest exception information and logger information
vEpiphyte May 10, 2021
639d700
Add a generic command line hook; interpret it right away for setting …
vEpiphyte May 11, 2021
9170de7
Fix typo
vEpiphyte May 11, 2021
85103f9
Fix hideconf setting
vEpiphyte May 11, 2021
e875d0c
fix unit test mock
vEpiphyte May 11, 2021
cca01f0
Merge branch 'master' into feat_struct_log
vEpiphyte May 12, 2021
8fbcb6a
Merge branch 'master' into feat_struct_log
vEpiphyte May 13, 2021
3cc405b
Fix up test_cell_confprint
vEpiphyte May 13, 2021
8bc6dd2
Rip out extra outp usage.
vEpiphyte May 13, 2021
3a8bbdb
Remove thread from log record
vEpiphyte May 13, 2021
2eec576
Coverage
vEpiphyte May 13, 2021
41a14ed
Merge branch 'master' into feat_struct_log
invisig0th May 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Expand Up @@ -742,6 +742,7 @@ workflows:
branches:
only:
- master
- feat_struct_log

- gh-release/dorelease:
requires:
Expand Down
72 changes: 34 additions & 38 deletions synapse/common.py
Expand Up @@ -30,6 +30,7 @@
import synapse.exc as s_exc
import synapse.lib.const as s_const
import synapse.lib.msgpack as s_msgpack
import synapse.lib.structlog as s_structlog

class NoValu:
pass
Expand Down Expand Up @@ -438,39 +439,6 @@ def verstr(vtup):
'''
return '.'.join([str(v) for v in vtup])

def getexcfo(e):
'''
Get an err tufo from an exception.

Args:
e (Exception): An Exception (or Exception subclass).

Notes:
This can be called outside of the context of an exception handler,
however details such as file, line, function name and source may be
missing.

Returns:
((str, dict)):
'''
tb = sys.exc_info()[2]
tbinfo = traceback.extract_tb(tb)
path, line, name, src = '', '', '', None
if tbinfo:
path, line, name, sorc = tbinfo[-1]
retd = {
'msg': str(e),
'file': path,
'line': line,
'name': name,
'src': src
}

if isinstance(e, s_exc.SynErr):
retd['syn:err'] = e.errinfo

return (e.__class__.__name__, retd)

def excinfo(e):
'''
Populate err,errmsg,errtrace info from exc.
Expand Down Expand Up @@ -644,7 +612,17 @@ def makedirs(path, mode=0o777):
def iterzip(*args, fillvalue=None):
return itertools.zip_longest(*args, fillvalue=fillvalue)

def setlogging(mlogger, defval=None):
def _getLogConfFromEnv(defval=None, structlog=None):
if structlog:
structlog = 'true'
else:
structlog = 'false'
defval = os.getenv('SYN_LOG_LEVEL', defval)
structlog = envbool('SYN_LOG_STRUCT', structlog)
ret = {'defval': defval, 'structlog': structlog}
return ret

def setlogging(mlogger, defval=None, structlog=None):
'''
Configure synapse logging.

Expand All @@ -658,16 +636,28 @@ def setlogging(mlogger, defval=None):
Returns:
None
'''
log_level = os.getenv('SYN_LOG_LEVEL',
defval)
ret = _getLogConfFromEnv(defval, structlog)

log_level = ret.get('defval')
log_struct = ret.get('structlog')

if log_level: # pragma: no cover
if isinstance(log_level, str):
log_level = log_level.upper()
if log_level not in s_const.LOG_LEVEL_CHOICES:
raise ValueError('Invalid log level provided: {}'.format(log_level))
logging.basicConfig(level=log_level, format=s_const.LOG_FORMAT)

if log_struct:
handler = logging.StreamHandler()
formatter = s_structlog.JsonFormatter()
handler.setFormatter(formatter)
logging.basicConfig(level=log_level, handlers=(handler,))
else:
logging.basicConfig(level=log_level, format=s_const.LOG_FORMAT)
mlogger.info('log level set to %s', log_level)

return ret

syndir = os.getenv('SYN_DIR')
if syndir is None:
syndir = '~/.syn'
Expand Down Expand Up @@ -713,7 +703,7 @@ def result(retn):
info['errx'] = name
raise s_exc.SynErr(**info)

def err(e):
def err(e, fulltb=False):
name = e.__class__.__name__
info = {}

Expand All @@ -734,6 +724,12 @@ def err(e):
else:
info['mesg'] = str(e)

if fulltb:
s = traceback.format_exc()
if s[-1:] == "\n":
s = s[:-1]
info['etb'] = s

return (name, info)

def retnexc(e):
Expand Down
4 changes: 3 additions & 1 deletion synapse/cortex.py
Expand Up @@ -58,6 +58,7 @@
import synapse.lib.stormlib.modelext as s_stormlib_modelext # NOQA

logger = logging.getLogger(__name__)
stormlogger = logging.getLogger('synapse.storm')

'''
A Cortex implements the synapse hypergraph object.
Expand Down Expand Up @@ -4235,7 +4236,8 @@ def _logStormQuery(self, text, user):
'''
if self.conf.get('storm:log'):
lvl = self.conf.get('storm:log:level')
logger.log(lvl, 'Executing storm query {%s} as [%s]', text, user.name)
stormlogger.log(lvl, 'Executing storm query {%s} as [%s]', text, user.name,
extra={'synapse': {'text': text, 'username': user.name, 'user': user.iden}})

async def getNodeByNdef(self, ndef, view=None):
'''
Expand Down
66 changes: 37 additions & 29 deletions synapse/lib/cell.py
Expand Up @@ -131,7 +131,7 @@ def _iterBackupProc(path, linkinfo, done):
'''
# This logging call is okay to run since we're executing in
# our own process space and no logging has been configured.
s_common.setlogging(logger, linkinfo.get('loglevel'))
s_common.setlogging(logger, **linkinfo.get('logconf'))

asyncio.run(_iterBackupWork(path, linkinfo, done))

Expand Down Expand Up @@ -707,6 +707,11 @@ class Cell(s_nexus.Pusher, s_telepath.Aware):
}
},
'required': ('urlinfo', ),
},
'_log_conf': {
'description': 'Opaque structure used for logging by spawned processes.',
'type': 'object',
'hideconf': True
}
}

Expand Down Expand Up @@ -1212,7 +1217,7 @@ async def _execBackupTask(self, dirn):

mypipe, child_pipe = ctx.Pipe()
paths = [str(slab.path) for slab in slabs]
loglevel = logger.getEffectiveLevel()
logconf = await self._getSpawnLogConf()
proc = None

try:
Expand All @@ -1228,7 +1233,7 @@ async def _execBackupTask(self, dirn):

logger.debug('Starting backup process')

args = (child_pipe, self.dirn, dirn, paths, loglevel)
args = (child_pipe, self.dirn, dirn, paths, logconf)

def waitforproc1():
nonlocal proc
Expand Down Expand Up @@ -1262,12 +1267,12 @@ def waitforproc2():
proc.terminate()

@staticmethod
def _backupProc(pipe, srcdir, dstdir, lmdbpaths, loglevel):
def _backupProc(pipe, srcdir, dstdir, lmdbpaths, logconf):
'''
(In a separate process) Actually do the backup
'''
# This is a new process: configure logging
s_common.setlogging(logger, loglevel)
s_common.setlogging(logger, **logconf)

with s_t_backup.capturelmdbs(srcdir, onlydirs=lmdbpaths) as lmdbinfo:
pipe.send('captured')
Expand Down Expand Up @@ -1325,7 +1330,7 @@ async def iterBackupArchive(self, name, user):

link = s_scope.get('link')
linkinfo = await link.getSpawnInfo()
linkinfo['loglevel'] = logger.getEffectiveLevel()
linkinfo['logconf'] = await self._getSpawnLogConf()

await self.boss.promote('backup:stream', user=user, info={'name': name})

Expand Down Expand Up @@ -1383,7 +1388,7 @@ async def iterNewBackupArchive(self, user, name=None, remove=False):
await self.runBackup(name)
link = s_scope.get('link')
linkinfo = await link.getSpawnInfo()
linkinfo['loglevel'] = logger.getEffectiveLevel()
linkinfo['logconf'] = await self._getSpawnLogConf()

await self.boss.promote('backup:stream', user=user, info={'name': name})

Expand Down Expand Up @@ -1838,6 +1843,12 @@ async def getTeleApi(self, link, mesg, path):
async def getCellApi(self, link, user, path):
return await self.cellapi.anit(self, link, user)

async def _getSpawnLogConf(self):
conf = self.conf.get('_log_conf')
if conf:
return conf
return s_common._getLogConfFromEnv()

@classmethod
def getCellType(cls):
return cls.__name__.lower()
Expand Down Expand Up @@ -1890,6 +1901,8 @@ def getArgParser(cls, conf=None):

pars.add_argument('--log-level', default='INFO', choices=s_const.LOG_LEVEL_CHOICES,
help='Specify the Python logging log level.', type=str.upper)
pars.add_argument('--structured-logging', default=False, action='store_true',
help='Use structured logging.')

telendef = None
telepdef = 'tcp://0.0.0.0:27492'
Expand Down Expand Up @@ -1928,7 +1941,7 @@ async def initFromArgv(cls, argv, outp=None):

Args:
argv (list): A list of command line arguments to launch the Cell with.
outp (s_ouput.OutPut): Optional, an output object.
outp (s_ouput.OutPut): Optional, an output object. No longer used in the default implementation.

Notes:
This does the following items:
Expand All @@ -1950,8 +1963,9 @@ async def initFromArgv(cls, argv, outp=None):

opts = pars.parse_args(argv)

s_common.setlogging(logger, defval=opts.log_level)

logconf = s_common.setlogging(logger, defval=opts.log_level,
structlog=opts.structured_logging)
conf.setdefault('_log_conf', logconf)
conf.setConfFromOpts(opts)
conf.setConfFromEnvs()

Expand All @@ -1961,33 +1975,27 @@ async def initFromArgv(cls, argv, outp=None):

if 'dmon:listen' not in cell.conf:
await cell.dmon.listen(opts.telepath)
if outp is not None:
outp.printf(f'...{cell.getCellType()} API (telepath): %s' % (opts.telepath,))
logger.info(f'...{cell.getCellType()} API (telepath): {opts.telepath}')
else:
lisn = cell.conf.get('dmon:listen')
if lisn is None:
lisn = cell.getLocalUrl()

if outp is not None:
lisn = cell.conf.get('dmon:listen')
if lisn is None:
lisn = cell.getLocalUrl()

outp.printf(f'...{cell.getCellType()} API (telepath): %s' % (lisn,))
logger.info(f'...{cell.getCellType()} API (telepath): {lisn}')

if 'https:port' not in cell.conf:
await cell.addHttpsPort(opts.https)
if outp is not None:
outp.printf(f'...{cell.getCellType()} API (https): %s' % (opts.https,))
logger.info(f'...{cell.getCellType()} API (https): {opts.https}')
else:
if outp is not None:
port = cell.conf.get('https:port')
if port is None:
outp.printf(f'...{cell.getCellType()} API (https): disabled')
else:
outp.printf(f'...{cell.getCellType()} API (https): %s' % (port,))
port = cell.conf.get('https:port')
if port is None:
logger.info(f'...{cell.getCellType()} API (https): disabled')
else:
logger.info(f'...{cell.getCellType()} API (https): {port}')

if opts.name is not None:
cell.dmon.share(opts.name, cell)
if outp is not None:
outp.printf(f'...{cell.getCellType()} API (telepath name): %s' % (opts.name,))
logger.info(f'...{cell.getCellType()} API (telepath name): {opts.name}')

except (asyncio.CancelledError, Exception):
await cell.fini()
Expand All @@ -2002,7 +2010,7 @@ async def execmain(cls, argv, outp=None):

Args:
argv (list): A list of command line arguments to launch the Cell with.
outp (s_ouput.OutPut): Optional, an output object.
outp (s_ouput.OutPut): Optional, an output object. No longer used in the default implementation.

Notes:
This coroutine waits until the Cell is fini'd or a SIGINT/SIGTERM signal is sent to the process.
Expand Down
8 changes: 4 additions & 4 deletions synapse/lib/stormlib/stix.py
Expand Up @@ -509,13 +509,13 @@ def _validateConfig(core, config):
mesg = f'STIX Bundle config has invalid rel entry {formname} {stixtype} {stixrel}.'
raise s_exc.BadConfValu(mesg=mesg)

def _validateStixProc(bundle, loglevel):
def _validateStixProc(bundle, logconf):
'''
Multiprocessing target for stix validation
'''
# This logging call is okay to run since we're executing in
# our own process space and no logging has been configured.
s_common.setlogging(logger, loglevel)
s_common.setlogging(logger, **logconf)

resp = validateStix(bundle)
return resp
Expand Down Expand Up @@ -600,8 +600,8 @@ def getObjLocals(self):

async def validateBundle(self, bundle):
bundle = await s_stormtypes.toprim(bundle)
loglevel = logger.getEffectiveLevel()
resp = await s_coro.spawn(s_common.todo(_validateStixProc, bundle, loglevel=loglevel))
logconf = await self.runt.snap.core._getSpawnLogConf()
resp = await s_coro.spawn(s_common.todo(_validateStixProc, bundle, logconf=logconf))
return resp

async def liftBundle(self, bundle):
Expand Down
40 changes: 40 additions & 0 deletions synapse/lib/structlog.py
@@ -0,0 +1,40 @@
import json

import logging

import synapse.common as s_common

class JsonFormatter(logging.Formatter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def format(self, record: logging.LogRecord):

record.message = record.getMessage()
mesg = self.formatMessage(record)
ret = {
'message': mesg,
'logger': {
'name': record.name,
'process': record.processName,
'filename': record.filename,
'func': record.funcName,
},
'level': record.levelname,
'time': self.formatTime(record, self.datefmt),
}

if record.exc_info:
name, info = s_common.err(record.exc_info[1], fulltb=True)
# This is the actual exception name. The ename key is the function name.
info['errname'] = name
ret['err'] = info

# stuffing our extra into a single dictionary avoids a loop
# over record.__dict__ extracting fields which are not known
# attributes for each log record.
extras = record.__dict__.get('synapse')
if extras:
ret.update({k: v for k, v in extras.items() if k not in ret})

return json.dumps(ret, default=str)