Skip to content

Commit

Permalink
Refactor Connection Management and Update Unit Tests for Improved Rob…
Browse files Browse the repository at this point in the history
…ustness (#78)

* Enhance connection tests to improve code coverage and reliability
* Refactor manager tests and add jest watch script
* decouple connection and manager 
* Added a 'test:watch' script to package.json to enable running jest in watch mode.
* Refactored the manager tests to improve readability and test coverage.
* Updated the type of logQueue from a tuple to an Array in manager.ts.
* Fix js file formatting
* Add command for run with coverage
* Expand Manager and Connection Test Suite
  • Loading branch information
jaakkos committed May 29, 2023
1 parent 8294de4 commit 15f3519
Show file tree
Hide file tree
Showing 15 changed files with 333 additions and 214 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ test/runner.js
test/support/logstash/*

!test-bench/logstash/logstash/output/.gitkeep
test-bench/logstash/logstash/output/sample.log
test-bench/logstash/logstash/output/sample.log

coverage/
65 changes: 42 additions & 23 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,67 @@
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.SecureConnection = exports.PlainConnection = exports.Connection = void 0;
exports.SecureConnection = exports.PlainConnection = exports.ConnectionEvents = exports.ConnectionActions = exports.Connection = void 0;
var _net = require("net");
var _fs = require("fs");
var _tls = _interopRequireDefault(require("tls"));
var _events = require("events");
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
var ConnectionActions;
let ConnectionActions;
exports.ConnectionActions = ConnectionActions;
(function (ConnectionActions) {
ConnectionActions["Initializing"] = "Initializing";
ConnectionActions["Connecting"] = "Connecting";
ConnectionActions["Closing"] = "Closing";
ConnectionActions["Tranferring"] = "Transferring";
ConnectionActions["HandlingError"] = "HandlingError";
})(ConnectionActions || (ConnectionActions = {}));
class Connection {
constructor(options, manager) {
})(ConnectionActions || (exports.ConnectionActions = ConnectionActions = {}));
let ConnectionEvents;
exports.ConnectionEvents = ConnectionEvents;
(function (ConnectionEvents) {
ConnectionEvents["Connected"] = "connection:connected";
ConnectionEvents["Closed"] = "connection:closed";
ConnectionEvents["ClosedByServer"] = "connection:closed:by-server";
ConnectionEvents["Error"] = "connection:error";
ConnectionEvents["Timeout"] = "connection:timeout";
ConnectionEvents["Drain"] = "connection:drain";
})(ConnectionEvents || (exports.ConnectionEvents = ConnectionEvents = {}));
class Connection extends _events.EventEmitter {
constructor(options) {
var _options$host, _options$port;
super();
_defineProperty(this, "socket", void 0);
_defineProperty(this, "host", void 0);
_defineProperty(this, "port", void 0);
_defineProperty(this, "manager", void 0);
_defineProperty(this, "action", void 0);
this.action = ConnectionActions.Initializing;
this.manager = manager;
this.host = (_options$host = options === null || options === void 0 ? void 0 : options.host) !== null && _options$host !== void 0 ? _options$host : '127.0.0.1';
this.port = (_options$port = options === null || options === void 0 ? void 0 : options.port) !== null && _options$port !== void 0 ? _options$port : 28777;
}
socketOnError(error) {
this.action = ConnectionActions.HandlingError;
this.manager.emit('connection:error', error);
this.emit(ConnectionEvents.Error, error);
}
socketOnTimeout() {
var _this$socket;
this.action = ConnectionActions.HandlingError;
this.manager.emit('connection:timeout', (_this$socket = this.socket) === null || _this$socket === void 0 ? void 0 : _this$socket.readyState);
this.emit(ConnectionEvents.Timeout, (_this$socket = this.socket) === null || _this$socket === void 0 ? void 0 : _this$socket.readyState);
}
socketOnConnect() {
var _this$socket2;
(_this$socket2 = this.socket) === null || _this$socket2 === void 0 ? void 0 : _this$socket2.setKeepAlive(true, 60 * 1000);
this.action = ConnectionActions.Tranferring;
this.manager.emit('connection:connected');
this.emit(ConnectionEvents.Connected);
}
socketOnDrain() {
this.manager.emit('connection:drain');
this.emit(ConnectionEvents.Drain);
}
socketOnClose(error) {
if (this.action === ConnectionActions.Closing) {
this.manager.emit('connection:closed', error);
this.emit(ConnectionEvents.Closed, error);
} else {
this.manager.emit('connection:closed:by-server', error);
this.emit(ConnectionEvents.ClosedByServer, error);
}
}
addEventListeners(socket) {
Expand All @@ -66,7 +77,7 @@ class Connection {
this.action = ConnectionActions.Closing;
(_this$socket3 = this.socket) === null || _this$socket3 === void 0 ? void 0 : _this$socket3.removeAllListeners();
(_this$socket4 = this.socket) === null || _this$socket4 === void 0 ? void 0 : _this$socket4.destroy();
this.manager.emit('connection:closed');
this.emit(ConnectionEvents.Closed);
}
send(message, writeCallback) {
var _this$socket5;
Expand All @@ -84,16 +95,20 @@ exports.Connection = Connection;
class PlainConnection extends Connection {
connect() {
super.connect();
this.socket = new _net.Socket();
super.addEventListeners(this.socket);
this.socket.on('connect', super.socketOnConnect.bind(this));
this.socket.connect(this.port, this.host);
try {
this.socket = new _net.Socket();
super.addEventListeners(this.socket);
this.socket.once('connect', super.socketOnConnect.bind(this));
this.socket.connect(this.port, this.host);
} catch (error) {
this.emit(ConnectionEvents.Error, error);
}
}
}
exports.PlainConnection = PlainConnection;
class SecureConnection extends Connection {
constructor(options, manager) {
super(options, manager);
constructor(options) {
super(options);
_defineProperty(this, "secureContextOptions", void 0);
this.secureContextOptions = SecureConnection.createSecureContextOptions(options);
}
Expand All @@ -114,9 +129,13 @@ class SecureConnection extends Connection {
}
connect() {
super.connect();
this.socket = _tls.default.connect(this.port, this.host, this.secureContextOptions);
super.addEventListeners(this.socket);
this.socket.on('secureConnect', super.socketOnConnect.bind(this));
try {
this.socket = _tls.default.connect(this.port, this.host, this.secureContextOptions);
super.addEventListeners(this.socket);
this.socket.once('secureConnect', super.socketOnConnect.bind(this));
} catch (error) {
this.emit(ConnectionEvents.Error, error);
}
}
}
exports.SecureConnection = SecureConnection;
62 changes: 26 additions & 36 deletions lib/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ var _events = require("events");
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
const ECONNREFUSED_REGEXP = /ECONNREFUSED/;
class Manager extends _events.EventEmitter {
constructor(options) {
constructor(options, connection) {
var _options$max_connect_, _options$timeout_conn;
super();
_defineProperty(this, "connection", void 0);
_defineProperty(this, "logQueue", void 0);
_defineProperty(this, "options", void 0);
_defineProperty(this, "useSecureConnection", void 0);
_defineProperty(this, "retries", -1);
_defineProperty(this, "maxConnectRetries", void 0);
_defineProperty(this, "timeoutConnectRetries", void 0);
_defineProperty(this, "retryTimeout", undefined);
this.options = options;
this.useSecureConnection = (options === null || options === void 0 ? void 0 : options.ssl_enable) === true;
this.connection = connection;
this.logQueue = new Array();

// Connection retry attributes
Expand All @@ -30,27 +29,20 @@ class Manager extends _events.EventEmitter {
this.timeoutConnectRetries = (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100;
}
addEventListeners() {
this.once('connection:connected', this.onConnected.bind(this));
this.once('connection:closed', this.onConnectionClosed.bind(this));
this.once('connection:closed:by-server', this.onConnectionError.bind(this));
this.once('connection:error', this.onConnectionError.bind(this));
this.once('connection:timeout', this.onConnectionError.bind(this));
this.on('connection:drain', this.flush.bind(this));
this.connection.once(_connection.ConnectionEvents.Connected, this.onConnected.bind(this));
this.connection.once(_connection.ConnectionEvents.Closed, this.onConnectionClosed.bind(this));
this.connection.once(_connection.ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this));
this.connection.once(_connection.ConnectionEvents.Error, this.onConnectionError.bind(this));
this.connection.once(_connection.ConnectionEvents.Timeout, this.onConnectionError.bind(this));
this.connection.on(_connection.ConnectionEvents.Drain, this.flush.bind(this));
}
removeEventListeners() {
this.off('connection:connected', this.onConnected.bind(this));
this.off('connection:closed', this.onConnectionClosed.bind(this));
this.off('connection:closed:by-server', this.onConnectionError.bind(this));
this.off('connection:error', this.onConnectionError.bind(this));
this.off('connection:timeout', this.onConnectionError.bind(this));
this.off('connection:drain', this.flush.bind(this));
}
createConnection() {
if (this.useSecureConnection) {
return new _connection.SecureConnection(this.options, this);
} else {
return new _connection.PlainConnection(this.options, this);
}
this.connection.off(_connection.ConnectionEvents.Connected, this.onConnected.bind(this));
this.connection.off(_connection.ConnectionEvents.Closed, this.onConnectionClosed.bind(this));
this.connection.off(_connection.ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this));
this.connection.off(_connection.ConnectionEvents.Error, this.onConnectionError.bind(this));
this.connection.off(_connection.ConnectionEvents.Timeout, this.onConnectionError.bind(this));
this.connection.off(_connection.ConnectionEvents.Drain, this.flush.bind(this));
}
onConnected() {
this.emit('connected');
Expand Down Expand Up @@ -89,46 +81,44 @@ class Manager extends _events.EventEmitter {
}
}
retry() {
var _this$connection2;
if (this.retryTimeout) {
clearTimeout(this.retryTimeout);
}
this.emit('retrying');
this.removeEventListeners();
const self = this;
this.once('connection:closed', () => {
this.connection.once(_connection.ConnectionEvents.Closed, () => {
self.removeEventListeners();
self.retryTimeout = setTimeout(() => {
self.connection = undefined;
self.start();
}, self.timeoutConnectRetries);
});
(_this$connection2 = this.connection) === null || _this$connection2 === void 0 ? void 0 : _this$connection2.close();
this.connection.close();
}
setConnection(connection) {
this.connection = connection;
}
start() {
if (!this.connection) {
this.retries++;
this.connection = this.createConnection();
this.addEventListeners();
this.connection.connect();
}
this.retries++;
this.addEventListeners();
this.connection.connect();
}
log(entry, callback) {
this.logQueue.push([entry, callback]);
process.nextTick(this.flush.bind(this));
}
close() {
var _this$connection3;
var _this$connection2;
this.emit('closing');
this.flush();
this.removeEventListeners();
(_this$connection3 = this.connection) === null || _this$connection3 === void 0 ? void 0 : _this$connection3.close();
(_this$connection2 = this.connection) === null || _this$connection2 === void 0 ? void 0 : _this$connection2.close();
}
flush() {
this.emit('flushing');
let connectionIsDrained = true;
while (this.logQueue.length && connectionIsDrained && (_this$connection4 = this.connection) !== null && _this$connection4 !== void 0 && _this$connection4.readyToSend()) {
var _this$connection4;
while (this.logQueue.length && connectionIsDrained && (_this$connection3 = this.connection) !== null && _this$connection3 !== void 0 && _this$connection3.readyToSend()) {
var _this$connection3;
const logEntry = this.logQueue.shift();
if (logEntry) {
const [entry, callback] = logEntry;
Expand Down
5 changes: 4 additions & 1 deletion lib/winston-logstash-latest.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

var _winstonTransport = _interopRequireDefault(require("winston-transport"));
var _manager = require("./manager");
var _connection = require("./connection");
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
//
Expand All @@ -12,9 +13,11 @@ module.exports = class LogstashTransport extends _winstonTransport.default {
constructor(options) {
super(options);
_defineProperty(this, "manager", void 0);
_defineProperty(this, "connection", void 0);
_defineProperty(this, "name", void 0);
this.name = 'logstash';
this.manager = new _manager.Manager(options);
this.connection = options.ssl_enable ? new _connection.SecureConnection(options) : new _connection.PlainConnection(options);
this.manager = new _manager.Manager(options, this.connection);
this.manager.on('error', this.onError.bind(this));
this.manager.start();
}
Expand Down
5 changes: 4 additions & 1 deletion lib/winston-logstash.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Object.defineProperty(exports, "__esModule", {
exports.Logstash = void 0;
var _winston = require("winston");
var _manager = require("./manager");
var _connection = require("./connection");
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }
const common = require('winston/lib/winston/common');
class Logstash extends _winston.Transport {
Expand All @@ -16,13 +17,15 @@ class Logstash extends _winston.Transport {
_defineProperty(this, "label", void 0);
_defineProperty(this, "meta_defaults", void 0);
_defineProperty(this, "manager", void 0);
_defineProperty(this, "connection", void 0);
this.name = 'logstash';
this.node_name = options.node_name || process.title;

// Miscellaneous options
this.label = options.label || this.node_name;
this.meta_defaults = Object.assign({}, options.meta);
this.manager = new _manager.Manager(options);
this.connection = options.ssl_enable ? new _connection.SecureConnection(options) : new _connection.PlainConnection(options);
this.manager = new _manager.Manager(options, this.connection);
this.manager.on('error', this.onError.bind(this));
this.manager.start();
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"homepage": "https://github.com/jaakkos/winston-logstash",
"scripts": {
"test": "jest",
"lint": "eslint .",
"test:watch": "jest --watch",
"test:coverage": "jest --coverage",
"watch": "babel src --out-dir lib --extensions '.ts' --watch",
"build": "babel src --out-dir lib --extensions '.ts'",
"version": "auto-changelog -p && git add CHANGELOG.md"
Expand Down
Loading

0 comments on commit 15f3519

Please sign in to comment.