Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make log routing for containers flexible #179

Merged
merged 6 commits into from Jan 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -7,5 +7,7 @@ node_js:
- "8"
- "10"
- "12"
- "13"
install:
- npm i -g mocha
- npm i
4 changes: 4 additions & 0 deletions bin/logagent.js
Expand Up @@ -298,6 +298,7 @@ LaCli.prototype.loadPlugins = function (configFile) {
var inputFilterSections = Object.keys(configFile.inputFilter)
inputFilterSections.forEach(function (key) {
if (configFile.inputFilter[key].module) {
configFile.inputFilter[key].configName = key
inputFilter.push(configFile.inputFilter[key])
}
})
Expand All @@ -320,6 +321,7 @@ LaCli.prototype.loadPlugins = function (configFile) {
configFile.output[key].module = 'elasticsearch'
}
if (configFile.output[key].module) {
configFile.output[key].configName = key
plugins.push({
module: moduleAlias[configFile.output[key].module] || configFile.output[key].module,
config: configFile.output[key],
Expand Down Expand Up @@ -358,6 +360,7 @@ LaCli.prototype.loadPlugins = function (configFile) {
if (configFile && configFile.outputFilter) {
var outputFilterSections = Object.keys(configFile.outputFilter)
outputFilterSections.forEach(function (key) {
configFile.outputFilter[key].configName = key
if (configFile.outputFilter[key].module) {
outputFilter.push(configFile.outputFilter[key])
}
Expand Down Expand Up @@ -401,6 +404,7 @@ LaCli.prototype.loadPlugins = function (configFile) {
plugins.push({
module: '../lib/plugins/output/elasticsearch',
config: {
configName: `argv_elasticsearch_${this.argv.elasticsearchUrl}/${this.argv.index}`,
indices: this.argv.indices,
url: this.argv.elasticsearchUrl,
index: this.argv.index
Expand Down
30 changes: 24 additions & 6 deletions lib/plugins/input/docker/dockerInspect.js
Expand Up @@ -3,11 +3,11 @@ var Docker = require('dockerode')
var docker = new Docker()
var minimatch = require('minimatch')
var consoleLogger = require('../../../util/logger.js')
var parser = require('../../../util/parser.js')

var dockerInfo = {}
var tagIds = null


docker.info(function dockerInfoHandler (err, data) {
if (err) {
console.error(err)
Expand Down Expand Up @@ -67,7 +67,6 @@ function getValue (name, list, info) {
info.tags = {}
}
info.tags[keys[k]] = list[keys[k]]
info.tags
}
}
} else {
Expand Down Expand Up @@ -105,16 +104,19 @@ function extractLoggingTags (labels, env, info) {
}

function getLogseneEnabled (info) {
var token = null
let token = null

extractLoggingTags(info.Config.Labels, info.Config.Env, info)
// tag container info with LOGSENE_ENABLED flag
// check for Labels
if (info.Config && info.Config.Labels && info.Config.Labels.LOGSENE_ENABLED !== undefined) {
info.LOGSENE_ENABLED = info.Config.Labels.LOGSENE_ENABLED
if (info.Config && info.Config.Labels) {
info.LOGSENE_ENABLED = info.Config.Labels.LOGSENE_ENABLED || info.Config.Labels.LOGS_ENABLED || null
} else {
// no Label set, check for ENV var
info.LOGSENE_ENABLED = getEnvVar('LOGSENE_ENABLED', info.Config.Env)
if (!info.LOGSENE_ENABLED) {
info.LOGSENE_ENABLED = getEnvVar('LOGS_ENABLED', info.Config.Env)
}
}
if (info.LOGSENE_ENABLED === null) {
// no Label or env var set, use LOGSENE_ENABLED_DEFAULT
Expand Down Expand Up @@ -144,11 +146,27 @@ function getLogseneEnabled (info) {
}
}
info.LOGSENE_TOKEN = token || process.env.LOGS_TOKEN || process.env.LOGSENE_TOKEN
// get optional log receiver URLs
let logsReceiver = null
if (info.Config && info.Config.Labels && info.Config.Labels.LOGS_RECEIVER_URL) {
logsReceiver = info.Config.Labels.LOGS_RECEIVER_URL
} else {
logsReceiver = getEnvVar('LOGS_RECEIVER_URL', info.Config.Env)
}
if (logsReceiver) {
info.logsReceiver = parser.parseReceiverList(logsReceiver)
}
// get optional logs target name
if (info.Config && info.Config.Labels && info.Config.Labels.LOGS_DESTINATION) {
info.LOGS_DESTINATION = info.Config.Labels.LOGS_DESTINATION
} else {
info.LOGS_DESTINATION = getEnvVar('LOGS_DESTINATION', info.Config.Env)
}
return info
}

function getLogseneToken (err, info) {
var token = null
let token = null
if (!err) {
extractLoggingTags(info.Config.Labels, info.Config.Env, info)
// tag container info with LOGSENE_ENABLED flag
Expand Down
42 changes: 15 additions & 27 deletions lib/plugins/output-filter/docker-log-enrichment.js
@@ -1,7 +1,8 @@
var warningRegex = /warning/i
var errorRegex = /[^|\S]error|exception/i
var K8S = /^k8s_/
var parseImageRegex = /^(\S+?\.\S+?\/|\S+?\:\d+\/){0,1}(\S+?):(\S+?){0,1}(@\S+?){0,1}$/

var parser = require('../../util/parser.js')

var k8sMetadata = {}
if (process.env.SEVERITY_ERROR_PATTERN) {
Expand Down Expand Up @@ -46,24 +47,6 @@ function parseKubernetesInfo (containerName, logObject) {
}
}

function parseImage (image) {
var rv = { name: image }
var result = parseImageRegex.exec(image)
if (result) {
if (result.length > 3) {
if (result[1]) {
rv.registry = result[1]
}
rv.name = result[2]
rv.tag = result[3]
if (result[4] !== undefined) {
rv.digest = result[4].substring(1, result[4].length)
}
}
}
return rv
}

module.exports = function enrichDockerLogs (context, config, eventEmitter, data, cb) {
if (!context.container_name) {
return cb(null, data)
Expand All @@ -72,7 +55,7 @@ module.exports = function enrichDockerLogs (context, config, eventEmitter, data,
id: context.container_long_id,
type: 'docker',
name: context.container_name,
image: parseImage(context.image || ''),
image: parser.parseImage(context.image || ''),
host: {
hostname: process.env.SPM_REPORTED_HOSTNAME
}
Expand All @@ -89,16 +72,13 @@ module.exports = function enrichDockerLogs (context, config, eventEmitter, data,
context.index = context.dockerInspect.LOGSENE_TOKEN
data._index = context.dockerInspect.LOGSENE_TOKEN
}
if (context.dockerInspect && context.dockerInspect.LOGS_RECEIVER_URL) {
context.elasticsearchUrl = context.dockerInspect.LOGS_RECEIVER_URL
}
if (context.labels) {
/*
var keys = Object.keys(context.labels)
for (var i=0; i < keys.length; i++) {
data[ keys[i] ] = context.labels[ keys[i] ]
}
*/
console.log(context.labels)
data.labels = context.labels
}

if (context.dockerInspect && context.dockerInspect.Config && context.dockerInspect.Config.Labels) {
var swarmInfo = {}
var stackName = context.dockerInspect.Config.Labels['com.docker.stack.namespace']
Expand All @@ -116,6 +96,14 @@ module.exports = function enrichDockerLogs (context, config, eventEmitter, data,
data.swarm = swarmInfo
}
}
// set logs receiver url for output plugins
if (context.dockerInspect && context.dockerInspect.logsReceiver) {
context.logsReceiver = context.dockerInspect.logsReceiver
}
// set logs destination / name of ES output module
if (context.dockerInspect && context.dockerInspect.LOGS_DESTINATION) {
context.logsDestination = context.dockerInspect.LOGS_DESTINATION
}
var logObject = data
// make sure that top level message field is a String
var messageString = logObject.message || logObject.msg || logObject.MESSAGE
Expand Down
14 changes: 11 additions & 3 deletions lib/plugins/output-filter/kubernetes-enrichment.js
@@ -1,7 +1,7 @@
const { KubeConfig } = require('kubernetes-client')
const Client = require('kubernetes-client').Client
const Request = require('kubernetes-client/backends/request')

const parser = require('../../util/parser.js')
const kubeconfig = new KubeConfig()
var consoleLogger = require('../../util/logger.js')
var LRU = require('lru-cache')
Expand All @@ -10,7 +10,7 @@ var podCache = new LRU({
maxAge: 1000 * 60 * 60
})

var digestRegEx = /sha256:.+/i
// var digestRegEx = /sha256:.+/i
var client = null
const FALSE_REGEX = /false/i

Expand Down Expand Up @@ -59,7 +59,7 @@ function removeFields (pod, data) {
pod.stRemoveFields = false
return
}
var removeFields = annotations['REMOVE_FIELDS'] || annotations['sematext.com/logs-remove-fields']
var removeFields = annotations.REMOVE_FIELDS || annotations['sematext.com/logs-remove-fields']
if (removeFields) {
var fieldNames = removeFields.split(',')
for (var i = 0; i < fieldNames.length; i++) {
Expand Down Expand Up @@ -90,6 +90,13 @@ function checkLogsEnabled (pod, data, context) {
}
}

function checkLogsReceiverUrl (pod, data, context) {
var annotations = pod.metadata.annotations
if (annotations && annotations['sematext.com/logs-receiver-url']) {
context.logsReceiver = parser.parseReceiverList(annotations['sematext.com/logs-receiver-url'])
}
}

function addLogsIndex (pod, data) {
if (pod.stLogsTokenSet === false) {
return
Expand Down Expand Up @@ -136,6 +143,7 @@ function processAnnotations (data, context) {
replaceDockerImageName(pod, data)
removeFields(pod, data)
addLogsIndex(pod, data)
checkLogsReceiverUrl(pod, data, context)
}
}

Expand Down
45 changes: 31 additions & 14 deletions lib/plugins/output/elasticsearch.js
Expand Up @@ -51,14 +51,15 @@ function OutputElasticsearch (config, eventEmitter) {
}
}

OutputElasticsearch.prototype.indexData = function (token, logType, data, config) {
var logger = this.getLogger(token, logType, config)
OutputElasticsearch.prototype.indexData = function (token, logType, data, config, logsReceiverUrl) {
var logger = this.getLogger(token, logType, config, logsReceiverUrl)
this.logCount++
logger.log(data.severity || data.level || 'info', data.message || data.msg || data.MESSAGE, data)
}

OutputElasticsearch.prototype.getLogger = function (token, type, config) {
var loggerName = token + '/' + type
OutputElasticsearch.prototype.getLogger = function (token, type, config, logsReceiverUrl) {
var url = logsReceiverUrl || config.elasticsearchUrl
var loggerName = `${url}/${token}`
if (!this.loggers[loggerName]) {
var options = { useIndexInBulkUrl: true }
var usedType = type
Expand All @@ -68,11 +69,11 @@ OutputElasticsearch.prototype.getLogger = function (token, type, config) {
} else {
usedType = config.type || 'logs'
}
if (this.httpOptions) {
if (this.httpOptions && url !== logsReceiverUrl) {
options.httpOptions = this.httpOptions
}
console.log('using type ' + usedType)
var logger = new Logsene(token, usedType, config.elasticsearchUrl,
// console.log('using type ' + usedType)
var logger = new Logsene(token, usedType, url,
config.diskBufferDir, options)
this.laStats.usedTokens.push(token)
logger.on('log', function (data) {
Expand All @@ -89,25 +90,43 @@ OutputElasticsearch.prototype.getLogger = function (token, type, config) {
this.laStats.logsShipped += data.count
}.bind(this))
if (process.env.LOG_NEW_TOKENS) {
consoleLogger.log('create logger for token: ' + token)
consoleLogger.log(`New elasticsearch logger ${loggerName} created`)
}
this.loggers[loggerName] = logger
}
return this.loggers[loggerName]
}

OutputElasticsearch.prototype.eventHandler = function (data, context) {
if (context.logsDestination && this.config.configName && this.config.configName.indexOf(context.logsDestination) === -1) {
return
}
var config = reduceConfig(context, data, this.config)
var index = data._index || context.index || config.index || process.env.LOGSENE_TOKEN
if (config.tokenMapper) {
index = config.tokenMapper.findToken(data.logSource || context.sourceName) || index
if (config.dropLogsForUnmatchedIndices === true) {
index = config.tokenMapper.findToken(data.logSource || context.sourceName)
} else {
index = config.tokenMapper.findToken(data.logSource || context.sourceName) || index
}
}
if (!index) {
return
if (index) {
// support for time-based index patterns
index = applyDateFormatToIndex(index)
this.indexData(index, data['_type'] || 'logs', data, config, context.logsReceiverUrl)
}
if (context.logsReceiver && context.logsReceiver.length > 0) {
for (let i = 0; i < context.logsReceiver.length; i++) {
this.indexData(
applyDateFormatToIndex(context.logsReceiver[i].index),
data['_type'] || 'logs', data, config, context.logsReceiver[i].url)
}
}
}

function applyDateFormatToIndex (index) {
// support for time-based index patterns
index = index.replace(/YYYY|MM|DD/g, function (match) {
return index.replace(/YYYY|MM|DD/g, function (match) {
if (match === 'YYYY') {
return '' + data['@timestamp'].getFullYear()
}
Expand All @@ -119,8 +138,6 @@ OutputElasticsearch.prototype.eventHandler = function (data, context) {
}
return match
})

this.indexData(index, data['_type'] || 'logs', data, config)
}

OutputElasticsearch.prototype.start = function () {
Expand Down
55 changes: 55 additions & 0 deletions lib/util/parser.js
@@ -0,0 +1,55 @@
'use strict'
/**
* Parse a list of URLs to an array with base URL and index for elasticsearch API endpoints
* Return value is an array with objects with url and index property, e.g.
* [{url: 'http://servername', index: 'logs'}]
* @receiverListAsString a comma separatedlist of URLs
*/
function parseReceiverList (receiverListAsString) {
if (!receiverListAsString) {
return undefined
}
let receivers = []
let parseUrlRegx = /(\S+:\/\/\S+?)\/(\S+)$/i
let receiverList = receiverListAsString.split(',')
for (let receiver of receiverList) {
let url = receiver.match(parseUrlRegx)
if (url && url.length === 3) {
receivers.push({
url: url[1],
index: url[2]
})
}
}
return receivers
}

/**
* Parse Docker image names.
* Returns an object with name,registry, tag properties.
* [{url: 'http://servername', index: 'logs'}]
* @receiverListAsString a comma separatedlist of URLs
*/
var parseImageRegex = /^(\S+?\.\S+?\/|\S+?\:\d+\/){0,1}(\S+?):(\S+?){0,1}(@\S+?){0,1}$/i
function parseImage (image) {
var rv = { name: image }
var result = parseImageRegex.exec(image)
if (result) {
if (result.length > 3) {
if (result[1]) {
rv.registry = result[1]
}
rv.name = result[2]
rv.tag = result[3]
if (result[4] !== undefined) {
rv.digest = result[4].substring(1, result[4].length)
}
}
}
return rv
}

module.exports = {
parseImage: parseImage,
parseReceiverList: parseReceiverList
}