diff --git a/README.md b/README.md index a5e3f68..9b56453 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,8 @@ If multiple objects are provided as arguments, the contents are stringified. - `messageType` [`_doc`] the type (path segment after the index path) under which the messages are stored under the index. - `transformer` [see below] a transformer function to transform logged data into a different message structure. - `ensureMappingTemplate` [`true`] If set to `true`, the given `mappingTemplate` is checked/ uploaded to ES when the module is sending the fist log message to make sure the log messages are mapped in a sensible manner. -- `mappingTemplate` [see file `index-template-mapping.json` file] the mapping template to be ensured as parsed JSON. +- `mappingTemplate` [see file `index-template-mapping-es-gte-7.json` or `index-template-mapping-es-lte-6.json`] the mapping template to be ensured as parsed JSON. +- `elasticsearchVersion` [`7`] Elasticsearch version you are using. This helps decide the default mapping template that will be used when `ensureMappingTemplate` is `true` and `mappingTemplate` is `undefined` - `flushInterval` [`2000`] distance between bulk writes in ms. - `client` An [elasticsearch client](https://www.npmjs.com/package/@elastic/elasticsearch) instance. If given, all following options are ignored. - `clientOpts` An object hash passed to the ES client. See [its docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/client-configuration.html) for supported options. diff --git a/bulk_writer.js b/bulk_writer.js index eaf5170..1399d95 100644 --- a/bulk_writer.js +++ b/bulk_writer.js @@ -183,34 +183,40 @@ BulkWriter.prototype.checkEsConnection = function checkEsConnection() { debug('checking for ES connection'); thiz.client.cluster.health({ timeout: '5s', - wait_for_nodes: '1', + wait_for_nodes: '>=1', wait_for_status: 'yellow' }) .then( - (res) => { - thiz.esConnection = true; - // Ensure mapping template is existing if desired - if (thiz.options.ensureMappingTemplate) { - thiz.ensureMappingTemplate(fulfill, reject); - } else { - fulfill(true); - } - if (thiz.options.buffering === true) { - debug('starting bulk writer'); - thiz.running = true; - thiz.tick(); - } - }, - (err) => { - debug('re-checking for connection to ES'); - if (operation.retry(err)) { - return; + (res) => { + thiz.esConnection = true; + const start = () => { + if (thiz.options.buffering === true) { + debug('starting bulk writer'); + thiz.running = true; + thiz.tick(); + } + }; + // Ensure mapping template is existing if desired + if (thiz.options.ensureMappingTemplate) { + thiz.ensureMappingTemplate((res1) => { + fulfill(res1); + start(); + }, reject); + } else { + fulfill(true); + start(); + } + }, + (err) => { + debug('re-checking for connection to ES'); + if (operation.retry(err)) { + return; + } + thiz.esConnection = false; + debug('cannot connect to ES'); + reject(new Error('Cannot connect to ES')); } - thiz.esConnection = false; - debug('cannot connect to ES'); - reject(new Error('Cannot connect to ES')); - } - ); + ); }); }); }; @@ -227,8 +233,11 @@ BulkWriter.prototype.ensureMappingTemplate = function ensureMappingTemplate( // eslint-disable-next-line prefer-destructuring let mappingTemplate = thiz.options.mappingTemplate; if (mappingTemplate === null || typeof mappingTemplate === 'undefined') { + // es version 6 and below will use 'index-template-mapping-es-lte-6.json' + // 7 and above will use 'index-template-mapping-es-gte-7.json' + const esVersion = Number(thiz.options.elasticsearchVersion) >= 7 ? 'gte-7' : 'lte-6'; const rawdata = fs.readFileSync( - path.join(__dirname, 'index-template-mapping.json') + path.join(__dirname, 'index-template-mapping-es-' + esVersion + '.json') ); mappingTemplate = JSON.parse(rawdata); mappingTemplate.index_patterns = indexPrefix + '-*'; diff --git a/index-template-mapping-es-gte-7.json b/index-template-mapping-es-gte-7.json new file mode 100644 index 0000000..5c86a60 --- /dev/null +++ b/index-template-mapping-es-gte-7.json @@ -0,0 +1,22 @@ +{ + "index_patterns": ["logs-*"], + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "index": { + "refresh_interval": "5s" + } + }, + "mappings": { + "properties": { + "@timestamp": { "type": "date" }, + "@version": { "type": "keyword" }, + "message": { "type": "text" }, + "severity": { "type": "keyword" }, + "fields": { + "dynamic": true, + "properties": { } + } + } + } +} diff --git a/index-template-mapping-es-lte-6.json b/index-template-mapping-es-lte-6.json new file mode 100644 index 0000000..37c390c --- /dev/null +++ b/index-template-mapping-es-lte-6.json @@ -0,0 +1,36 @@ +{ + "index_patterns": [ + "logs-*" + ], + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "index": { + "refresh_interval": "5s" + } + }, + "mappings": { + "_doc": { + "properties": { + "@timestamp": { + "type": "date" + }, + "@version": { + "type": "keyword" + }, + "message": { + "type": "text", + "index": true + }, + "severity": { + "type": "keyword", + "index": true + }, + "fields": { + "dynamic": true, + "properties": {} + } + } + } + } +} diff --git a/index-template-mapping.json b/index-template-mapping.json deleted file mode 100644 index 1b069be..0000000 --- a/index-template-mapping.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "index_patterns": [ - "logs-*" - ], - "settings": { - "number_of_shards": 1, - "number_of_replicas": 0, - "index": { - "refresh_interval": "5s" - } - }, - "mappings": { - "_source": { - "enabled": true - }, - "properties": { - "@timestamp": { - "type": "date" - }, - "@version": { - "type": "keyword" - }, - "message": { - "type": "text", - "index": true - }, - "severity": { - "type": "keyword", - "index": true - }, - "fields": { - "dynamic": true, - "properties": {} - } - } - } -} diff --git a/index.d.ts b/index.d.ts index ed007a0..3731b06 100644 --- a/index.d.ts +++ b/index.d.ts @@ -22,6 +22,7 @@ export interface ElasticsearchTransportOptions extends TransportStream.Transport transformer?: Transformer; mappingTemplate?: { [key: string]: any }; ensureMappingTemplate?: boolean; + elasticsearchVersion?: number; flushInterval?: number; waitForActiveShards?: number | 'all'; handleExceptions?: boolean; diff --git a/index.js b/index.js index e8ea45c..6efdd4c 100644 --- a/index.js +++ b/index.js @@ -40,6 +40,7 @@ class ElasticsearchTransport extends Transport { messageType: '_doc', transformer: defaultTransformer, ensureMappingTemplate: true, + elasticsearchVersion: 7, flushInterval: 2000, waitForActiveShards: 1, handleExceptions: false, @@ -79,6 +80,7 @@ class ElasticsearchTransport extends Transport { indexPrefix: opts.indexPrefix, buffering: opts.buffering, bufferLimit: opts.buffering ? opts.bufferLimit : 0, + elasticsearchVersion: opts.elasticsearchVersion, }; this.bulkWriter = new BulkWriter(this, this.client, bulkWriteropts); diff --git a/test/test.js b/test/test.js index 0810bf1..128fe93 100644 --- a/test/test.js +++ b/test/test.js @@ -1,6 +1,7 @@ const fs = require('fs'); const should = require('should'); const winston = require('winston'); +const http = require('http'); require('../index'); const defaultTransformer = require('../transformer'); @@ -23,12 +24,23 @@ function NullLogger(config) { this.close = (msg) => {}; } +process.on('unhandledRejection', (error) => { + console.error(error); + process.exit(1); +}); +process.on('uncaughtException', (error) => { + console.error(error); + process.exit(1); +}); + +let elasticsearchVersion = 7; function createLogger(buffering) { return winston.createLogger({ transports: [ new winston.transports.Elasticsearch({ flushInterval: 1, buffering, + elasticsearchVersion, clientOpts: { log: NullLogger, node: 'http://localhost:9200' @@ -38,6 +50,25 @@ function createLogger(buffering) { }); } +before(() => { + return new Promise((resolve) => { + // get ES version being used + http.get('http://localhost:9200', (res) => { + res.setEncoding('utf8'); + let body = ''; + res.on('data', (data) => { + body += data; + }); + res.on('error', () => { resolve(); }); + res.on('end', () => { + body = JSON.parse(body); + elasticsearchVersion = parseInt(body.version.number.split('.')[0], 10) || 7; + resolve(); + }); + }); + }); +}); + describe('the default transformer', () => { it('should transform log data from winston into a logstash like structure', (done) => { const transformed = defaultTransformer({