Skip to content

Commit

Permalink
added TailReadStream as new experimental tail module
Browse files Browse the repository at this point in the history
  • Loading branch information
leeliu committed Mar 31, 2017
1 parent 21aab6e commit d5314e0
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 0 deletions.
26 changes: 26 additions & 0 deletions lib/file-utilities.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ var glob = require('glob');
var async = require('async');
var os = require('os');
var spawn = require('child_process').spawn;
var TailReadStream = require('./tailreadstream/tailreadstream');
var Splitter = require('./tailreadstream/line-splitter');

var GLOB_CHARS_REGEX = /[\*\?\[\]\(\)]/;
var globalExclude = [
Expand Down Expand Up @@ -104,6 +106,30 @@ module.exports.streamFiles = function(config, logfiles, callback) {

tails.push(tail);

} else if (config.TAIL_MODE === 'trs') {
debug('tailing: ' + file);
tail = TailReadStream.tail(file);
tail.pipe(new Splitter())
.on('data', line => {
linebuffer.addMessage({ e: 'l', t: Date.now(), l: line, f: file });
});

tail.on('error', err => {
log('Tail error: ' + file + ': ' + err);
});

tail.on('nofile', err => {
log('File does not exist, stopped tailing: ' + file + ' after ' + tail.timeout + 'ms');
});

tail.on('rename', () => {
log('Log rotated: ' + file + ' by rename');
});

tail.on('truncate', () => {
log('Log rotated: ' + file + ' by truncation');
});

} else {
try {
tail = new Tail(file, '\n', { interval: 100, blockSize: 10 * 1024 * 1024 });
Expand Down
55 changes: 55 additions & 0 deletions lib/tailreadstream/line-splitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
'use strict';
// from https://github.com/msimerson/safe-log-reader/blob/master/lib/line-splitter.js

// https://nodejs.org/api/stream.html#stream_object_mode

var StringDecoder = require('string_decoder').StringDecoder;
var Transform = require('stream').Transform;
var util = require('util');

function LineSplitter(options) {
if (!options) { options = {}; }
if (!options.transform) { options.transform = { objectMode: true }; }

Transform.call(this, options.transform);

this._encoding = options.encoding || 'utf8';
this._seperator = options.seperator || '\n';
this._buffer = '';
this._decoder = new StringDecoder(this._encoding);

this.bytes = options.bytes || 0;
}

util.inherits(LineSplitter, Transform);

LineSplitter.prototype._transform = function(chunk, encoding, done) {
this.bytes += chunk.length;

if (encoding !== this._encoding) {
// this is likely 'buffer' when the source file is an archive
this._buffer += this._decoder.write(chunk);
} else {
// already decoded by fs.createReadStream
this._buffer += chunk;
}

var lines = this._buffer.split(this._seperator);
this._buffer = lines.pop();

for (var i = 0; i < lines.length; i++) {
this.push(lines[i]);
}
done();
};

LineSplitter.prototype._flush = function(done) {
// trailing text (after last seperator)
var rem = this._buffer.trim();
if (rem) { this.push(rem); }
done();
};

module.exports = function(options) {
return new LineSplitter(options);
};
240 changes: 240 additions & 0 deletions lib/tailreadstream/tailreadstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
'use strict';
/*
* The base code for this is taken from the 'node-growing-file' module (https://github.com/felixge/node-growing-file) by felixge
* Due to the inactivity of the repo and our desire to switch to ES6 syntax, the code has been ported over with a few minor tweaks to the calling params
*/

const fs = require('fs');
const debug = require('debug')('logdna:tailreadstream');
const Readable = require('stream').Readable;

const DEFAULT_WATCH_INTERVAL = 1000;
const DEFAULT_READ_INTERVAL = 250;
const DEFAULT_READ_TIMEOUT = 3600000; // 1hr timeout
const DEFAULT_TAILHEAD_SIZE = 32768; // 32kb


class TailReadStream extends Readable {
constructor(filepath, options) {
options = options || {};

super();

this.readable = true;

this._filepath = filepath;
this._stream = null;
this._offset = 0;

this._interval = options.interval || DEFAULT_READ_INTERVAL;
this._timeout = options.timeout || DEFAULT_READ_TIMEOUT;
this._watchinterval = options.watchinterval || DEFAULT_WATCH_INTERVAL;
this._tailheadsize = options.tailheadsize || DEFAULT_TAILHEAD_SIZE;
this._idle = 0;

this._reading = false;
this._paused = false;
this._ended = false;
this._watching = false;
}

static get DOES_NOT_EXIST_ERROR() { return 'ENOENT'; }

static tail(filepath, fromstart, options) {
if (typeof fromstart === 'object') { // shift args
options = fromstart;
fromstart = false;
}
var file = new this(filepath, options);
if (fromstart) {
if (typeof fromstart === 'boolean') {
// read from start
debug(filepath + ': reading from beginning of file');
file._readFromOffsetUntilEof();
} else {
// read from offset
debug(filepath + ': reading from offset ' + fromstart);
file._readFromOffsetUntilEof(+fromstart);
}
} else {
// tail from end
debug(filepath + ': tailing from end');
file._getFileSizeAndReadUntilEof();
}
return file;
}

get offset() { return this._offset; }
get timeout() { return this._timeout; }
set timeout(timeout) { this._timeout = timeout; }

destroy() {
this.readable = false;
this._stream = null;
}

pause() {
this._paused = true;
this._stream.pause();
}

resume() {
if (!this._stream) return;
this._paused = false;
this._stream.resume();
this._readFromOffsetUntilEof();
}

_readFromOffsetUntilEof(offset) {
if (!isNaN(offset)) {
this._offset = offset;
}

if (this._paused || this._reading) {
return;
}

this._reading = true;

this._stream = fs.createReadStream(this._filepath, {
start: this._offset
});

this._stream.on('error', this._handleError.bind(this));
this._stream.on('data', this._handleData.bind(this));
this._stream.on('end', this._handleEnd.bind(this));
}

_getFileSize(callback) {
fs.stat(this._filepath, (err, stats) => {
if (err) {
return callback(err);
}
callback(null, stats.size);
});
}

_getFileSizeAndReadUntilEof() {
var that = this;

this._getFileSize((err, size) => {
if (err) {
that.readable = false;

if (that._hasTimedOut()) {
debug(that._filepath + ': file does not exist, timed out after ' + that._timeout + 'ms');
that.emit('nofile', err);
return;
}

if (err.code === TailReadStream.DOES_NOT_EXIST_ERROR) {
debug(that._filepath + ': file does not exist, waiting for it to appear...');
setTimeout(that._getFileSizeAndReadUntilEof.bind(that), that._interval);
that._idle += that._interval;
return;
}

that.emit('error', err);
return;
}

if (size < that._tailheadsize) {
debug(that._filepath + ': file is smaller than ' + that._tailheadsize + ' bytes, reading from beginning');
size = 0; // tail from beginning of file if small enough (e.g. newly created files)
}

that._readFromOffsetUntilEof(size);
});
}

_retryInInterval() {
setTimeout(this._readFromOffsetUntilEof.bind(this), this._interval);
}

_handleError(error) {
this._reading = false;

if (this._hasTimedOut()) {
debug(this._filepath + ': file no longer exists, timed out after ' + this._timeout + 'ms');
this.emit('nofile', error);
return;
}

if (error.code === TailReadStream.DOES_NOT_EXIST_ERROR) {
debug(this._filepath + ': file renamed, waiting for it to reappear...');
if (this.readable) {
this.readable = false;
this.emit('rename');
this._offset = 0; // reset on rename
}
this._idle += this._interval;
this._retryInInterval();
return;
}

this.readable = false;

this.emit('error', error);
}

_handleData(data) {
this.readable = true;

this._offset += data.length;
this._idle = 0;

debug(this._filepath + ': reading ' + data.length + ' bytes');
this.emit('data', data);
}

_handleEnd() {
this._reading = false;

if (!this._watching) {
this._watching = true;
this._watchFile();
}

if (!this._hasTimedOut()) {
this._retryInInterval();
return;
}

this.destroy();
this.emit('end');
}

_hasTimedOut() {
return this._idle >= this._timeout;
}

_watchFile() {
var that = this;

if (!this.readable) {
this._watching = false;
return;
}

this._getFileSize((err, size) => {
if (err) {
return setTimeout(that._watchFile.bind(that), that._watchinterval);
}

if (size < that._offset) {
that.emit('truncate', size);
if (size < that._tailheadsize) {
debug(that._filepath + ': file truncated but smaller than ' + that._tailheadsize + ' bytes, reading from beginning');
that._offset = 0;
} else {
debug(that._filepath + ': file truncated but larger than ' + that._tailheadsize + ' bytes, reading from ' + size);
that._offset = size;
}
}

setTimeout(that._watchFile.bind(that), that._watchinterval);
});
}
}

module.exports = TailReadStream;

0 comments on commit d5314e0

Please sign in to comment.