Skip to content

Commit

Permalink
Merge pull request #142 from Carriyo/fix-mapping-template
Browse files Browse the repository at this point in the history
fix for broken template mapping json
  • Loading branch information
vanthome committed Jun 8, 2020
2 parents 64c0bdd + 26f4172 commit d7de02c
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 63 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ If multiple objects are provided as arguments, the contents are stringified.
- `messageType` [`_doc`] the type (path segment after the index path) under which the messages are stored under the index.
- `transformer` [see below] a transformer function to transform logged data into a different message structure.
- `ensureMappingTemplate` [`true`] If set to `true`, the given `mappingTemplate` is checked/ uploaded to ES when the module is sending the fist log message to make sure the log messages are mapped in a sensible manner.
- `mappingTemplate` [see file `index-template-mapping.json` file] the mapping template to be ensured as parsed JSON.
- `mappingTemplate` [see file `index-template-mapping-es-gte-7.json` or `index-template-mapping-es-lte-6.json`] the mapping template to be ensured as parsed JSON.
- `elasticsearchVersion` [`7`] Elasticsearch version you are using. This helps decide the default mapping template that will be used when `ensureMappingTemplate` is `true` and `mappingTemplate` is `undefined`
- `flushInterval` [`2000`] distance between bulk writes in ms.
- `client` An [elasticsearch client](https://www.npmjs.com/package/@elastic/elasticsearch) instance. If given, all following options are ignored.
- `clientOpts` An object hash passed to the ES client. See [its docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/client-configuration.html) for supported options.
Expand Down
59 changes: 34 additions & 25 deletions bulk_writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,34 +183,40 @@ BulkWriter.prototype.checkEsConnection = function checkEsConnection() {
debug('checking for ES connection');
thiz.client.cluster.health({
timeout: '5s',
wait_for_nodes: '1',
wait_for_nodes: '>=1',
wait_for_status: 'yellow'
})
.then(
(res) => {
thiz.esConnection = true;
// Ensure mapping template is existing if desired
if (thiz.options.ensureMappingTemplate) {
thiz.ensureMappingTemplate(fulfill, reject);
} else {
fulfill(true);
}
if (thiz.options.buffering === true) {
debug('starting bulk writer');
thiz.running = true;
thiz.tick();
}
},
(err) => {
debug('re-checking for connection to ES');
if (operation.retry(err)) {
return;
(res) => {
thiz.esConnection = true;
const start = () => {
if (thiz.options.buffering === true) {
debug('starting bulk writer');
thiz.running = true;
thiz.tick();
}
};
// Ensure mapping template is existing if desired
if (thiz.options.ensureMappingTemplate) {
thiz.ensureMappingTemplate((res1) => {
fulfill(res1);
start();
}, reject);
} else {
fulfill(true);
start();
}
},
(err) => {
debug('re-checking for connection to ES');
if (operation.retry(err)) {
return;
}
thiz.esConnection = false;
debug('cannot connect to ES');
reject(new Error('Cannot connect to ES'));
}
thiz.esConnection = false;
debug('cannot connect to ES');
reject(new Error('Cannot connect to ES'));
}
);
);
});
});
};
Expand All @@ -227,8 +233,11 @@ BulkWriter.prototype.ensureMappingTemplate = function ensureMappingTemplate(
// eslint-disable-next-line prefer-destructuring
let mappingTemplate = thiz.options.mappingTemplate;
if (mappingTemplate === null || typeof mappingTemplate === 'undefined') {
// es version 6 and below will use 'index-template-mapping-es-lte-6.json'
// 7 and above will use 'index-template-mapping-es-gte-7.json'
const esVersion = Number(thiz.options.elasticsearchVersion) >= 7 ? 'gte-7' : 'lte-6';
const rawdata = fs.readFileSync(
path.join(__dirname, 'index-template-mapping.json')
path.join(__dirname, 'index-template-mapping-es-' + esVersion + '.json')
);
mappingTemplate = JSON.parse(rawdata);
mappingTemplate.index_patterns = indexPrefix + '-*';
Expand Down
22 changes: 22 additions & 0 deletions index-template-mapping-es-gte-7.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"index_patterns": ["logs-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"@version": { "type": "keyword" },
"message": { "type": "text" },
"severity": { "type": "keyword" },
"fields": {
"dynamic": true,
"properties": { }
}
}
}
}
36 changes: 36 additions & 0 deletions index-template-mapping-es-lte-6.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"index_patterns": [
"logs-*"
],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_doc": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "keyword"
},
"message": {
"type": "text",
"index": true
},
"severity": {
"type": "keyword",
"index": true
},
"fields": {
"dynamic": true,
"properties": {}
}
}
}
}
}
37 changes: 0 additions & 37 deletions index-template-mapping.json

This file was deleted.

1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface ElasticsearchTransportOptions extends TransportStream.Transport
transformer?: Transformer;
mappingTemplate?: { [key: string]: any };
ensureMappingTemplate?: boolean;
elasticsearchVersion?: number;
flushInterval?: number;
waitForActiveShards?: number | 'all';
handleExceptions?: boolean;
Expand Down
2 changes: 2 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ElasticsearchTransport extends Transport {
messageType: '_doc',
transformer: defaultTransformer,
ensureMappingTemplate: true,
elasticsearchVersion: 7,
flushInterval: 2000,
waitForActiveShards: 1,
handleExceptions: false,
Expand Down Expand Up @@ -79,6 +80,7 @@ class ElasticsearchTransport extends Transport {
indexPrefix: opts.indexPrefix,
buffering: opts.buffering,
bufferLimit: opts.buffering ? opts.bufferLimit : 0,
elasticsearchVersion: opts.elasticsearchVersion,
};

this.bulkWriter = new BulkWriter(this, this.client, bulkWriteropts);
Expand Down
31 changes: 31 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const fs = require('fs');
const should = require('should');
const winston = require('winston');
const http = require('http');

require('../index');
const defaultTransformer = require('../transformer');
Expand All @@ -23,12 +24,23 @@ function NullLogger(config) {
this.close = (msg) => {};
}

process.on('unhandledRejection', (error) => {
console.error(error);
process.exit(1);
});
process.on('uncaughtException', (error) => {
console.error(error);
process.exit(1);
});

let elasticsearchVersion = 7;
function createLogger(buffering) {
return winston.createLogger({
transports: [
new winston.transports.Elasticsearch({
flushInterval: 1,
buffering,
elasticsearchVersion,
clientOpts: {
log: NullLogger,
node: 'http://localhost:9200'
Expand All @@ -38,6 +50,25 @@ function createLogger(buffering) {
});
}

before(() => {
return new Promise((resolve) => {
// get ES version being used
http.get('http://localhost:9200', (res) => {
res.setEncoding('utf8');
let body = '';
res.on('data', (data) => {
body += data;
});
res.on('error', () => { resolve(); });
res.on('end', () => {
body = JSON.parse(body);
elasticsearchVersion = parseInt(body.version.number.split('.')[0], 10) || 7;
resolve();
});
});
});
});

describe('the default transformer', () => {
it('should transform log data from winston into a logstash like structure', (done) => {
const transformed = defaultTransformer({
Expand Down

0 comments on commit d7de02c

Please sign in to comment.