diff --git a/.gitignore b/.gitignore index 2ac9af7..9a9ef31 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,6 @@ test/runner.js test/support/logstash/* !test-bench/logstash/logstash/output/.gitkeep -test-bench/logstash/logstash/output/sample.log \ No newline at end of file +test-bench/logstash/logstash/output/sample.log + +coverage/ diff --git a/lib/connection.js b/lib/connection.js index 19c92e8..6ac2d23 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -3,56 +3,67 @@ Object.defineProperty(exports, "__esModule", { value: true }); -exports.SecureConnection = exports.PlainConnection = exports.Connection = void 0; +exports.SecureConnection = exports.PlainConnection = exports.ConnectionEvents = exports.ConnectionActions = exports.Connection = void 0; var _net = require("net"); var _fs = require("fs"); var _tls = _interopRequireDefault(require("tls")); +var _events = require("events"); function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } -var ConnectionActions; +let ConnectionActions; +exports.ConnectionActions = ConnectionActions; (function (ConnectionActions) { ConnectionActions["Initializing"] = "Initializing"; ConnectionActions["Connecting"] = "Connecting"; ConnectionActions["Closing"] = "Closing"; ConnectionActions["Tranferring"] = "Transferring"; ConnectionActions["HandlingError"] = "HandlingError"; -})(ConnectionActions || (ConnectionActions = {})); -class Connection { - constructor(options, manager) { +})(ConnectionActions || (exports.ConnectionActions = ConnectionActions = {})); +let ConnectionEvents; +exports.ConnectionEvents = ConnectionEvents; +(function (ConnectionEvents) { + ConnectionEvents["Connected"] = "connection:connected"; + ConnectionEvents["Closed"] = "connection:closed"; + ConnectionEvents["ClosedByServer"] = "connection:closed:by-server"; + ConnectionEvents["Error"] = "connection:error"; + ConnectionEvents["Timeout"] = "connection:timeout"; + ConnectionEvents["Drain"] = "connection:drain"; +})(ConnectionEvents || (exports.ConnectionEvents = ConnectionEvents = {})); +class Connection extends _events.EventEmitter { + constructor(options) { var _options$host, _options$port; + super(); _defineProperty(this, "socket", void 0); _defineProperty(this, "host", void 0); _defineProperty(this, "port", void 0); - _defineProperty(this, "manager", void 0); _defineProperty(this, "action", void 0); this.action = ConnectionActions.Initializing; - this.manager = manager; this.host = (_options$host = options === null || options === void 0 ? void 0 : options.host) !== null && _options$host !== void 0 ? _options$host : '127.0.0.1'; this.port = (_options$port = options === null || options === void 0 ? void 0 : options.port) !== null && _options$port !== void 0 ? _options$port : 28777; } socketOnError(error) { this.action = ConnectionActions.HandlingError; - this.manager.emit('connection:error', error); + this.emit(ConnectionEvents.Error, error); } socketOnTimeout() { var _this$socket; this.action = ConnectionActions.HandlingError; - this.manager.emit('connection:timeout', (_this$socket = this.socket) === null || _this$socket === void 0 ? void 0 : _this$socket.readyState); + this.emit(ConnectionEvents.Timeout, (_this$socket = this.socket) === null || _this$socket === void 0 ? void 0 : _this$socket.readyState); } socketOnConnect() { var _this$socket2; (_this$socket2 = this.socket) === null || _this$socket2 === void 0 ? void 0 : _this$socket2.setKeepAlive(true, 60 * 1000); this.action = ConnectionActions.Tranferring; - this.manager.emit('connection:connected'); + this.emit(ConnectionEvents.Connected); } socketOnDrain() { - this.manager.emit('connection:drain'); + this.emit(ConnectionEvents.Drain); } socketOnClose(error) { if (this.action === ConnectionActions.Closing) { - this.manager.emit('connection:closed', error); + this.emit(ConnectionEvents.Closed, error); } else { - this.manager.emit('connection:closed:by-server', error); + this.emit(ConnectionEvents.ClosedByServer, error); } } addEventListeners(socket) { @@ -66,7 +77,7 @@ class Connection { this.action = ConnectionActions.Closing; (_this$socket3 = this.socket) === null || _this$socket3 === void 0 ? void 0 : _this$socket3.removeAllListeners(); (_this$socket4 = this.socket) === null || _this$socket4 === void 0 ? void 0 : _this$socket4.destroy(); - this.manager.emit('connection:closed'); + this.emit(ConnectionEvents.Closed); } send(message, writeCallback) { var _this$socket5; @@ -84,16 +95,20 @@ exports.Connection = Connection; class PlainConnection extends Connection { connect() { super.connect(); - this.socket = new _net.Socket(); - super.addEventListeners(this.socket); - this.socket.on('connect', super.socketOnConnect.bind(this)); - this.socket.connect(this.port, this.host); + try { + this.socket = new _net.Socket(); + super.addEventListeners(this.socket); + this.socket.once('connect', super.socketOnConnect.bind(this)); + this.socket.connect(this.port, this.host); + } catch (error) { + this.emit(ConnectionEvents.Error, error); + } } } exports.PlainConnection = PlainConnection; class SecureConnection extends Connection { - constructor(options, manager) { - super(options, manager); + constructor(options) { + super(options); _defineProperty(this, "secureContextOptions", void 0); this.secureContextOptions = SecureConnection.createSecureContextOptions(options); } @@ -114,9 +129,13 @@ class SecureConnection extends Connection { } connect() { super.connect(); - this.socket = _tls.default.connect(this.port, this.host, this.secureContextOptions); - super.addEventListeners(this.socket); - this.socket.on('secureConnect', super.socketOnConnect.bind(this)); + try { + this.socket = _tls.default.connect(this.port, this.host, this.secureContextOptions); + super.addEventListeners(this.socket); + this.socket.once('secureConnect', super.socketOnConnect.bind(this)); + } catch (error) { + this.emit(ConnectionEvents.Error, error); + } } } exports.SecureConnection = SecureConnection; \ No newline at end of file diff --git a/lib/manager.js b/lib/manager.js index ff6152b..4eca35e 100644 --- a/lib/manager.js +++ b/lib/manager.js @@ -9,19 +9,18 @@ var _events = require("events"); function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } const ECONNREFUSED_REGEXP = /ECONNREFUSED/; class Manager extends _events.EventEmitter { - constructor(options) { + constructor(options, connection) { var _options$max_connect_, _options$timeout_conn; super(); _defineProperty(this, "connection", void 0); _defineProperty(this, "logQueue", void 0); _defineProperty(this, "options", void 0); - _defineProperty(this, "useSecureConnection", void 0); _defineProperty(this, "retries", -1); _defineProperty(this, "maxConnectRetries", void 0); _defineProperty(this, "timeoutConnectRetries", void 0); _defineProperty(this, "retryTimeout", undefined); this.options = options; - this.useSecureConnection = (options === null || options === void 0 ? void 0 : options.ssl_enable) === true; + this.connection = connection; this.logQueue = new Array(); // Connection retry attributes @@ -30,27 +29,20 @@ class Manager extends _events.EventEmitter { this.timeoutConnectRetries = (_options$timeout_conn = options === null || options === void 0 ? void 0 : options.timeout_connect_retries) !== null && _options$timeout_conn !== void 0 ? _options$timeout_conn : 100; } addEventListeners() { - this.once('connection:connected', this.onConnected.bind(this)); - this.once('connection:closed', this.onConnectionClosed.bind(this)); - this.once('connection:closed:by-server', this.onConnectionError.bind(this)); - this.once('connection:error', this.onConnectionError.bind(this)); - this.once('connection:timeout', this.onConnectionError.bind(this)); - this.on('connection:drain', this.flush.bind(this)); + this.connection.once(_connection.ConnectionEvents.Connected, this.onConnected.bind(this)); + this.connection.once(_connection.ConnectionEvents.Closed, this.onConnectionClosed.bind(this)); + this.connection.once(_connection.ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this)); + this.connection.once(_connection.ConnectionEvents.Error, this.onConnectionError.bind(this)); + this.connection.once(_connection.ConnectionEvents.Timeout, this.onConnectionError.bind(this)); + this.connection.on(_connection.ConnectionEvents.Drain, this.flush.bind(this)); } removeEventListeners() { - this.off('connection:connected', this.onConnected.bind(this)); - this.off('connection:closed', this.onConnectionClosed.bind(this)); - this.off('connection:closed:by-server', this.onConnectionError.bind(this)); - this.off('connection:error', this.onConnectionError.bind(this)); - this.off('connection:timeout', this.onConnectionError.bind(this)); - this.off('connection:drain', this.flush.bind(this)); - } - createConnection() { - if (this.useSecureConnection) { - return new _connection.SecureConnection(this.options, this); - } else { - return new _connection.PlainConnection(this.options, this); - } + this.connection.off(_connection.ConnectionEvents.Connected, this.onConnected.bind(this)); + this.connection.off(_connection.ConnectionEvents.Closed, this.onConnectionClosed.bind(this)); + this.connection.off(_connection.ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this)); + this.connection.off(_connection.ConnectionEvents.Error, this.onConnectionError.bind(this)); + this.connection.off(_connection.ConnectionEvents.Timeout, this.onConnectionError.bind(this)); + this.connection.off(_connection.ConnectionEvents.Drain, this.flush.bind(this)); } onConnected() { this.emit('connected'); @@ -89,46 +81,44 @@ class Manager extends _events.EventEmitter { } } retry() { - var _this$connection2; if (this.retryTimeout) { clearTimeout(this.retryTimeout); } this.emit('retrying'); this.removeEventListeners(); const self = this; - this.once('connection:closed', () => { + this.connection.once(_connection.ConnectionEvents.Closed, () => { self.removeEventListeners(); self.retryTimeout = setTimeout(() => { - self.connection = undefined; self.start(); }, self.timeoutConnectRetries); }); - (_this$connection2 = this.connection) === null || _this$connection2 === void 0 ? void 0 : _this$connection2.close(); + this.connection.close(); + } + setConnection(connection) { + this.connection = connection; } start() { - if (!this.connection) { - this.retries++; - this.connection = this.createConnection(); - this.addEventListeners(); - this.connection.connect(); - } + this.retries++; + this.addEventListeners(); + this.connection.connect(); } log(entry, callback) { this.logQueue.push([entry, callback]); process.nextTick(this.flush.bind(this)); } close() { - var _this$connection3; + var _this$connection2; this.emit('closing'); this.flush(); this.removeEventListeners(); - (_this$connection3 = this.connection) === null || _this$connection3 === void 0 ? void 0 : _this$connection3.close(); + (_this$connection2 = this.connection) === null || _this$connection2 === void 0 ? void 0 : _this$connection2.close(); } flush() { this.emit('flushing'); let connectionIsDrained = true; - while (this.logQueue.length && connectionIsDrained && (_this$connection4 = this.connection) !== null && _this$connection4 !== void 0 && _this$connection4.readyToSend()) { - var _this$connection4; + while (this.logQueue.length && connectionIsDrained && (_this$connection3 = this.connection) !== null && _this$connection3 !== void 0 && _this$connection3.readyToSend()) { + var _this$connection3; const logEntry = this.logQueue.shift(); if (logEntry) { const [entry, callback] = logEntry; diff --git a/lib/winston-logstash-latest.js b/lib/winston-logstash-latest.js index a24bcfb..583e358 100644 --- a/lib/winston-logstash-latest.js +++ b/lib/winston-logstash-latest.js @@ -2,6 +2,7 @@ var _winstonTransport = _interopRequireDefault(require("winston-transport")); var _manager = require("./manager"); +var _connection = require("./connection"); function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } // @@ -12,9 +13,11 @@ module.exports = class LogstashTransport extends _winstonTransport.default { constructor(options) { super(options); _defineProperty(this, "manager", void 0); + _defineProperty(this, "connection", void 0); _defineProperty(this, "name", void 0); this.name = 'logstash'; - this.manager = new _manager.Manager(options); + this.connection = options.ssl_enable ? new _connection.SecureConnection(options) : new _connection.PlainConnection(options); + this.manager = new _manager.Manager(options, this.connection); this.manager.on('error', this.onError.bind(this)); this.manager.start(); } diff --git a/lib/winston-logstash.js b/lib/winston-logstash.js index 1d553d2..eb32673 100644 --- a/lib/winston-logstash.js +++ b/lib/winston-logstash.js @@ -6,6 +6,7 @@ Object.defineProperty(exports, "__esModule", { exports.Logstash = void 0; var _winston = require("winston"); var _manager = require("./manager"); +var _connection = require("./connection"); function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } const common = require('winston/lib/winston/common'); class Logstash extends _winston.Transport { @@ -16,13 +17,15 @@ class Logstash extends _winston.Transport { _defineProperty(this, "label", void 0); _defineProperty(this, "meta_defaults", void 0); _defineProperty(this, "manager", void 0); + _defineProperty(this, "connection", void 0); this.name = 'logstash'; this.node_name = options.node_name || process.title; // Miscellaneous options this.label = options.label || this.node_name; this.meta_defaults = Object.assign({}, options.meta); - this.manager = new _manager.Manager(options); + this.connection = options.ssl_enable ? new _connection.SecureConnection(options) : new _connection.PlainConnection(options); + this.manager = new _manager.Manager(options, this.connection); this.manager.on('error', this.onError.bind(this)); this.manager.start(); } diff --git a/package.json b/package.json index 27c8dad..dd9871b 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,8 @@ "homepage": "https://github.com/jaakkos/winston-logstash", "scripts": { "test": "jest", - "lint": "eslint .", + "test:watch": "jest --watch", + "test:coverage": "jest --coverage", "watch": "babel src --out-dir lib --extensions '.ts' --watch", "build": "babel src --out-dir lib --extensions '.ts'", "version": "auto-changelog -p && git add CHANGELOG.md" diff --git a/src/connection.test.ts b/src/connection.test.ts index 0e4d6ff..a16ae5b 100644 --- a/src/connection.test.ts +++ b/src/connection.test.ts @@ -1,5 +1,4 @@ -import { Connection, PlainConnection, SecureConnection } from './connection'; -import { Manager } from './manager'; +import { Connection, PlainConnection, SecureConnection, ConnectionActions, ConnectionEvents } from './connection'; import net from 'net'; import tls from 'tls'; import { EventEmitter } from 'events'; @@ -7,57 +6,110 @@ import { sslFilePath } from '../test/test_helper' jest.mock('net'); jest.mock('tls'); -jest.mock('./manager'); const MockedNet = net as jest.Mocked; const MockedTls = tls as jest.Mocked; -const MockedManager = Manager as jest.MockedClass; + +let socket: net.Socket & { emit: jest.Mock, readyState: string }; +let options: { host: string, port: number }; beforeEach(() => { MockedNet.Socket.mockClear(); MockedTls.connect.mockClear(); - MockedManager.mockClear(); + + options = { host: 'localhost', port: 12345 }; + + socket = new EventEmitter() as net.Socket & { emit: jest.Mock, readyState: string }; + socket.write = jest.fn().mockReturnValue(true); + socket.removeAllListeners = jest.fn(); + socket.destroy = jest.fn(); + socket.connect = jest.fn().mockImplementation(() => { + socket.emit('connect') + }); + socket.readyState = 'open'; }); describe('Connection', () => { - // @ts-ignore - const manager = new Manager(); const options = { host: 'localhost', port: 12345 }; - const connection = new Connection(options, manager); - test('initializes with provided options', () => { - // @ts-ignore - expect(connection.host).toBe(options.host); - // @ts-ignore - expect(connection.port).toBe(options.port); - }); + describe('PlainConnection', () => { + let connection: PlainConnection; + + beforeEach(() => { + connection = new PlainConnection(options); + }); + + test('initializes with provided options', () => { + expect(connection['host']).toBe(options.host); + expect(connection['port']).toBe(options.port); + }); + + test('can send a message', () => { + connection['socket'] = socket; + const message = 'test message'; + const callback = jest.fn(); - test('can send a message', () => { - const socket = new EventEmitter() as net.Socket; - // @ts-ignore - socket.readyState = 'open'; - socket.write = jest.fn().mockReturnValue(true); - // @ts-ignore - connection.socket = socket; - const message = 'test message'; - const callback = jest.fn(); + const result = connection.send(message, callback); - const result = connection.send(message, callback); + expect(result).toBe(true); + expect(socket.write).toHaveBeenCalledWith(Buffer.from(message), callback); + }); + + test('can close connection', () => { + connection['socket'] = socket; + connection.close(); + + expect(socket.removeAllListeners).toHaveBeenCalled(); + expect(socket.destroy).toHaveBeenCalled(); + expect(connection['action']).toBe(ConnectionActions.Closing); + }); + + test('can connect to server', () => { + MockedNet.Socket.mockReturnValue(socket as any); + connection.connect(); + expect(MockedNet.Socket).toHaveBeenCalledTimes(1); + expect(connection['action']).toBe(ConnectionActions.Connecting); + }); + + test('checks if connection is ready to send', () => { + connection['socket'] = socket; + expect(connection.readyToSend()).toBe(true); + }); - expect(result).toBe(true); - expect(socket.write).toHaveBeenCalledWith(Buffer.from(message), callback); }); - test('can close connection', () => { - const socket = new EventEmitter() as net.Socket; - socket.removeAllListeners = jest.fn(); - socket.destroy = jest.fn(); - // @ts-ignore - connection.socket = socket; + describe('SecureConnection', () => { + let connection: SecureConnection; + const sslOptions = { + ...options, + ssl_key: sslFilePath('client.key'), + ssl_cert: sslFilePath('client.cert'), + ca: sslFilePath('ca.cert') + }; + + beforeEach(() => { + connection = new SecureConnection(sslOptions); + }); + + test('initializes with provided options and SSL options', () => { + expect(connection['host']).toBe(sslOptions.host); + expect(connection['port']).toBe(sslOptions.port); + expect(connection['secureContextOptions'].key).toBeDefined(); + expect(connection['secureContextOptions'].cert).toBeDefined(); + expect(connection['secureContextOptions'].ca).toBeDefined(); + }); + + test('can connect to secure server', () => { + MockedTls.connect.mockReturnValue(socket as any); + connection.connect(); + expect(MockedTls.connect).toHaveBeenCalledTimes(1); + expect(connection['action']).toBe(ConnectionActions.Connecting); + }); - connection.close(); + test('checks if secure```javascript connection is ready to send', () => { + connection['socket'] = socket; + expect(connection.readyToSend()).toBe(true); + }); - expect(socket.removeAllListeners).toHaveBeenCalled(); - expect(socket.destroy).toHaveBeenCalled(); }); }); diff --git a/src/connection.ts b/src/connection.ts index ee45bf4..8465d36 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -5,14 +5,14 @@ * */ -import {Socket} from 'net' -import {readFileSync} from 'fs' +import { Socket } from 'net' +import { readFileSync } from 'fs' import tls from 'tls'; import { WinstonModuleTransportOptions } from 'winston'; -import { Manager } from './manager'; import { LogstashTransportSSLOptions } from './types'; +import { EventEmitter } from 'events'; -enum ConnectionActions { +export enum ConnectionActions { Initializing = "Initializing", Connecting = "Connecting", Closing = "Closing", @@ -20,45 +20,60 @@ enum ConnectionActions { HandlingError = "HandlingError" } -export class Connection { +export enum ConnectionEvents { + Connected = "connection:connected", + Closed = "connection:closed", + ClosedByServer = "connection:closed:by-server", + Error = "connection:error", + Timeout = "connection:timeout", + Drain = "connection:drain" +} + +export interface IConnection extends EventEmitter { + connect(): void; + close(): void; + send(message: string, callback: Function): boolean; + readyToSend(): boolean; +} + +export abstract class Connection extends EventEmitter implements IConnection { protected socket: Socket | undefined; protected host: string; protected port: number; - protected manager: Manager; protected action: ConnectionActions; - constructor(options: WinstonModuleTransportOptions, manager: Manager) { + constructor(options: WinstonModuleTransportOptions) { + super(); this.action = ConnectionActions.Initializing; - this.manager = manager; this.host = options?.host ?? '127.0.0.1'; this.port = options?.port ?? 28777; } private socketOnError(error: Error) { this.action = ConnectionActions.HandlingError; - this.manager.emit('connection:error', error); + this.emit(ConnectionEvents.Error, error); } private socketOnTimeout() { this.action = ConnectionActions.HandlingError; - this.manager.emit('connection:timeout', this.socket?.readyState); + this.emit(ConnectionEvents.Timeout, this.socket?.readyState); } protected socketOnConnect() { this.socket?.setKeepAlive(true, 60 * 1000); this.action = ConnectionActions.Tranferring; - this.manager.emit('connection:connected'); + this.emit(ConnectionEvents.Connected); } - protected socketOnDrain() { - this.manager.emit('connection:drain'); + private socketOnDrain() { + this.emit(ConnectionEvents.Drain); } private socketOnClose(error: Error) { if (this.action === ConnectionActions.Closing) { - this.manager.emit('connection:closed', error); + this.emit(ConnectionEvents.Closed, error); } else { - this.manager.emit('connection:closed:by-server', error); + this.emit(ConnectionEvents.ClosedByServer, error); } } @@ -69,17 +84,18 @@ export class Connection { socket.once('close', this.socketOnClose.bind(this)); } + close() { this.action = ConnectionActions.Closing; this.socket?.removeAllListeners(); this.socket?.destroy(); - this.manager.emit('connection:closed'); + this.emit(ConnectionEvents.Closed); } send(message: string, writeCallback: (error?: Error) => void): boolean { return this.socket?.write(Buffer.from(message), writeCallback) === true; } - + readyToSend(): boolean { return this.socket?.readyState === 'open'; } @@ -92,17 +108,21 @@ export class Connection { export class PlainConnection extends Connection { connect() { super.connect(); - this.socket = new Socket(); - super.addEventListeners(this.socket); - this.socket.on('connect', super.socketOnConnect.bind(this)); - this.socket.connect(this.port, this.host); + try { + this.socket = new Socket(); + super.addEventListeners(this.socket); + this.socket.once('connect', super.socketOnConnect.bind(this)); + this.socket.connect(this.port, this.host); + } catch (error) { + this.emit(ConnectionEvents.Error, error); + } } } export class SecureConnection extends Connection { private secureContextOptions: tls.ConnectionOptions; - constructor(options: WinstonModuleTransportOptions, manager: Manager) { - super(options, manager); + constructor(options: WinstonModuleTransportOptions) { + super(options); this.secureContextOptions = SecureConnection.createSecureContextOptions(options as LogstashTransportSSLOptions); } @@ -115,7 +135,7 @@ export class SecureConnection extends Connection { const rejectUnauthorized = options.rejectUnauthorized; const secureContextOptions = { - key: sslKey && readFileSync(sslKey) , + key: sslKey && readFileSync(sslKey), cert: sslCert && readFileSync(sslCert), passphrase: sslPassphrase || undefined, rejectUnauthorized: rejectUnauthorized!, @@ -127,10 +147,14 @@ export class SecureConnection extends Connection { connect() { super.connect(); - this.socket = tls.connect(this.port, + try { + this.socket = tls.connect(this.port, this.host, this.secureContextOptions); - super.addEventListeners(this.socket); - this.socket.on('secureConnect', super.socketOnConnect.bind(this)); + super.addEventListeners(this.socket); + this.socket.once('secureConnect', super.socketOnConnect.bind(this)); + } catch (error) { + this.emit(ConnectionEvents.Error, error); + } } } diff --git a/src/manager.test.ts b/src/manager.test.ts index 4f71a56..0297f8a 100644 --- a/src/manager.test.ts +++ b/src/manager.test.ts @@ -1,13 +1,14 @@ import { Manager } from './manager'; -import { Connection, PlainConnection, SecureConnection } from './connection'; +import { Connection, ConnectionEvents, PlainConnection, SecureConnection } from './connection'; jest.mock('./connection'); -const MockedConnection = Connection as jest.MockedClass; const MockedPlainConnection = PlainConnection as jest.MockedClass; const MockedSecureConnection = SecureConnection as jest.MockedClass; describe('Manager', () => { + let manager: Manager; + let connection: PlainConnection; const options = { host: 'localhost', port: 12345, @@ -17,54 +18,82 @@ describe('Manager', () => { }; beforeEach(() => { - MockedConnection.mockClear(); - MockedPlainConnection.mockClear(); - MockedSecureConnection.mockClear(); + jest.useFakeTimers(); + connection = new MockedPlainConnection(options); + manager = new Manager(options, connection); + connection.send = jest.fn().mockReturnValue(true); + connection.readyToSend = jest.fn().mockReturnValue(true); }); - test('initializes with provided options', () => { - const manager = new Manager(options); - expect(manager.options).toBe(options); - expect(manager.useSecureConnection).toBe(options.ssl_enable); - expect(manager.maxConnectRetries).toBe(options.max_connect_retries); - expect(manager.timeoutConnectRetries).toBe(options.timeout_connect_retries); - }); - - test('creates plain connection', () => { - const manager = new Manager(options); - manager.start(); - expect(PlainConnection).toHaveBeenCalledTimes(1); - expect(SecureConnection).toHaveBeenCalledTimes(0); + afterEach(() => { + jest.useRealTimers(); }); - test('creates secure connection', () => { - const manager = new Manager({ ...options, ssl_enable: true }); - manager.start(); - expect(PlainConnection).toHaveBeenCalledTimes(0); - expect(SecureConnection).toHaveBeenCalledTimes(1); + test('initializes with provided options', () => { + expect(manager['options']).toBe(options); + expect(manager['maxConnectRetries']).toBe(options.max_connect_retries); + expect(manager['timeoutConnectRetries']).toBe(options.timeout_connect_retries); }); test('logs an entry', () => { const logEntry = 'test log entry'; const callback = jest.fn(); - const manager = new Manager(options); - manager.connection = new PlainConnection(options, manager); - manager.connection.send = jest.fn().mockReturnValue(true); + manager.log(logEntry, callback); - expect(manager.logQueue).toHaveLength(1); - expect(manager.logQueue[0][0]).toBe(logEntry); + + expect(manager['logQueue']).toHaveLength(1); + expect(manager['logQueue'][0][0]).toBe(logEntry); }); test('flushes log queue', () => { const logEntry = 'test log entry'; const callback = jest.fn(); - const manager = new Manager(options); - manager.connection = new PlainConnection(options, manager); - manager.connection.send = jest.fn().mockReturnValue(true); - manager.connection.readyToSend = jest.fn().mockReturnValue(true); - manager.logQueue.push([logEntry, callback]); + manager['logQueue'].push([logEntry, callback]); + manager.flush(); - expect(manager.logQueue).toHaveLength(0); - expect(manager.connection.send).toHaveBeenCalledWith(logEntry + '\n', expect.any(Function)); + + expect(manager['logQueue']).toHaveLength(0); + expect(connection.send).toHaveBeenCalledWith(logEntry + '\n', expect.any(Function)); + }); + + test('should emit events when connection methods are called', () => { + const mockEventEmit = jest.spyOn(manager, 'emit'); + + manager['onConnected'](); + expect(mockEventEmit).toHaveBeenCalledWith('connected'); + + mockEventEmit.mockClear(); + + // @ts-ignore + manager.onConnectionClosed(new Error()); + expect(mockEventEmit).toHaveBeenCalledWith('closed'); + + }); + + test('should stop retrying after max retries are reached', () => { + const spyOnStart = jest.spyOn(manager, 'start'); + const error = new Error('Test error'); + + // Set the number of retries to the max. + manager['retries'] = manager['maxConnectRetries']; + + // Trigger an error on the connection. + connection.emit(ConnectionEvents.Error, error); + + jest.runAllTimers(); + + // Check that the manager's start method was not called. + expect(spyOnStart).not.toHaveBeenCalled(); + }); + + test('should close the manager', () => { + const spyOnClose = jest.spyOn(connection, 'close'); + const spyOnEmit = jest.spyOn(manager, 'emit'); + + manager.close(); + + expect(spyOnEmit).toHaveBeenCalledWith('closing'); + expect(spyOnClose).toHaveBeenCalled(); }); + }); diff --git a/src/manager.ts b/src/manager.ts index 0c78379..a8c4dbc 100644 --- a/src/manager.ts +++ b/src/manager.ts @@ -5,27 +5,25 @@ * */ -import {Connection, SecureConnection, PlainConnection} from './connection' +import { IConnection, ConnectionEvents } from './connection' import { EventEmitter } from 'events'; import { LogstashTransportOptions, LogEntry } from './types'; const ECONNREFUSED_REGEXP = /ECONNREFUSED/; export class Manager extends EventEmitter { - connection: Connection | undefined - logQueue: [String, Function][]; - options: LogstashTransportOptions; - useSecureConnection: Boolean; - retries: number = -1; - maxConnectRetries: number; - timeoutConnectRetries: number; - retryTimeout?: ReturnType = undefined; - - constructor(options: LogstashTransportOptions) { + private connection: IConnection + private logQueue: Array<[string, Function]>; + private options: LogstashTransportOptions; + private retries: number = -1; + private maxConnectRetries: number; + private timeoutConnectRetries: number; + private retryTimeout?: ReturnType = undefined; + + constructor(options: LogstashTransportOptions, connection: IConnection) { super(); this.options = options; - this.useSecureConnection = options?.ssl_enable === true; - + this.connection = connection; this.logQueue = new Array(); // Connection retry attributes @@ -35,29 +33,21 @@ export class Manager extends EventEmitter { } private addEventListeners() { - this.once('connection:connected', this.onConnected.bind(this)); - this.once('connection:closed', this.onConnectionClosed.bind(this)); - this.once('connection:closed:by-server', this.onConnectionError.bind(this)); - this.once('connection:error', this.onConnectionError.bind(this)); - this.once('connection:timeout', this.onConnectionError.bind(this)); - this.on('connection:drain', this.flush.bind(this)); + this.connection.once(ConnectionEvents.Connected, this.onConnected.bind(this)); + this.connection.once(ConnectionEvents.Closed, this.onConnectionClosed.bind(this)); + this.connection.once(ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this)); + this.connection.once(ConnectionEvents.Error, this.onConnectionError.bind(this)); + this.connection.once(ConnectionEvents.Timeout, this.onConnectionError.bind(this)); + this.connection.on(ConnectionEvents.Drain, this.flush.bind(this)); } private removeEventListeners() { - this.off('connection:connected', this.onConnected.bind(this)); - this.off('connection:closed', this.onConnectionClosed.bind(this)); - this.off('connection:closed:by-server', this.onConnectionError.bind(this)); - this.off('connection:error', this.onConnectionError.bind(this)); - this.off('connection:timeout', this.onConnectionError.bind(this)); - this.off('connection:drain', this.flush.bind(this)); - } - - private createConnection() { - if (this.useSecureConnection) { - return new SecureConnection(this.options, this); - } else { - return new PlainConnection(this.options, this); - } + this.connection.off(ConnectionEvents.Connected, this.onConnected.bind(this)); + this.connection.off(ConnectionEvents.Closed, this.onConnectionClosed.bind(this)); + this.connection.off(ConnectionEvents.ClosedByServer, this.onConnectionError.bind(this)); + this.connection.off(ConnectionEvents.Error, this.onConnectionError.bind(this)); + this.connection.off(ConnectionEvents.Timeout, this.onConnectionError.bind(this)); + this.connection.off(ConnectionEvents.Drain, this.flush.bind(this)); } private onConnected() { @@ -80,7 +70,7 @@ export class Manager extends EventEmitter { private shouldTryToReconnect(error: Error) { if (this.isRetryableError(error) === true) { if (this.maxConnectRetries < 0 || - this.retries < this.maxConnectRetries) { + this.retries < this.maxConnectRetries) { return true; } else { return false; @@ -97,7 +87,7 @@ export class Manager extends EventEmitter { this.removeEventListeners(); this.connection?.close(); this.emit('error', - new Error('Max retries reached, transport in silent mode, OFFLINE')); + new Error('Max retries reached, transport in silent mode, OFFLINE')); } } @@ -110,24 +100,24 @@ export class Manager extends EventEmitter { this.removeEventListeners(); const self = this; - this.once('connection:closed', () => { + this.connection.once(ConnectionEvents.Closed, () => { self.removeEventListeners(); self.retryTimeout = setTimeout(() => { - self.connection = undefined self.start(); }, - self.timeoutConnectRetries); + self.timeoutConnectRetries); }); - this.connection?.close(); + this.connection.close(); + } + + public setConnection(connection: IConnection): void { + this.connection = connection; } start() { - if (!this.connection) { - this.retries++; - this.connection = this.createConnection(); - this.addEventListeners(); - this.connection.connect(); - } + this.retries++; + this.addEventListeners(); + this.connection.connect(); } log(entry: string, callback: Function) { diff --git a/src/winston-logstash-latest.ts b/src/winston-logstash-latest.ts index 1f4a63e..e6af6a9 100644 --- a/src/winston-logstash-latest.ts +++ b/src/winston-logstash-latest.ts @@ -8,6 +8,7 @@ import Transport from "winston-transport"; import { Manager } from './manager'; import { LogstashTransportOptions } from "./types"; +import { IConnection, PlainConnection, SecureConnection } from "./connection"; // // Inherit from `winston-transport` so you can take advantage @@ -15,13 +16,15 @@ import { LogstashTransportOptions } from "./types"; // module.exports = class LogstashTransport extends Transport { private manager: Manager; + private connection: IConnection; public name: string; constructor(options: LogstashTransportOptions) { super(options); this.name = 'logstash'; - this.manager = new Manager(options); + this.connection = options.ssl_enable ? new SecureConnection(options) : new PlainConnection(options); + this.manager = new Manager(options, this.connection); this.manager.on('error', this.onError.bind(this)); this.manager.start(); } diff --git a/src/winston-logstash.ts b/src/winston-logstash.ts index 5046f60..46bb100 100644 --- a/src/winston-logstash.ts +++ b/src/winston-logstash.ts @@ -8,6 +8,7 @@ import {Transport} from "winston"; const common = require('winston/lib/winston/common'); import { Manager } from './manager'; import { LogstashTransportOptions } from "./types"; +import { IConnection, PlainConnection, SecureConnection } from "./connection"; export class Logstash extends Transport { private node_name: string @@ -15,6 +16,7 @@ export class Logstash extends Transport { private label: string private meta_defaults: object private manager: Manager + private connection: IConnection; constructor(options: LogstashTransportOptions) { super(options); @@ -25,7 +27,8 @@ export class Logstash extends Transport { this.label = options.label || this.node_name; this.meta_defaults = Object.assign({}, options.meta); - this.manager = new Manager(options); + this.connection = options.ssl_enable ? new SecureConnection(options) : new PlainConnection(options); + this.manager = new Manager(options, this.connection); this.manager.on('error', this.onError.bind(this)); this.manager.start(); } diff --git a/test-bench/winston-2x/jest.config.js b/test-bench/winston-2x/jest.config.js index 83b1811..b6a9ac8 100644 --- a/test-bench/winston-2x/jest.config.js +++ b/test-bench/winston-2x/jest.config.js @@ -31,7 +31,7 @@ module.exports = { // ], // Indicates which provider should be used to instrument code for coverage - coverageProvider: "v8", + coverageProvider: 'v8', // A list of reporter names that Jest uses when writing coverage reports // coverageReporters: [ @@ -154,12 +154,12 @@ module.exports = { // The glob patterns Jest uses to detect test files testMatch: [ - "**/test/*_test.js" + '**/test/*_test.js', ], // An array of regexp pattern strings that are matched against all test paths, matched tests are skipped testPathIgnorePatterns: [ - "/node_modules/" + '/node_modules/', ], // The regexp pattern or array of patterns that Jest uses to detect test files diff --git a/test-bench/winston-3x/jest.config.js b/test-bench/winston-3x/jest.config.js index 83b1811..b6a9ac8 100644 --- a/test-bench/winston-3x/jest.config.js +++ b/test-bench/winston-3x/jest.config.js @@ -31,7 +31,7 @@ module.exports = { // ], // Indicates which provider should be used to instrument code for coverage - coverageProvider: "v8", + coverageProvider: 'v8', // A list of reporter names that Jest uses when writing coverage reports // coverageReporters: [ @@ -154,12 +154,12 @@ module.exports = { // The glob patterns Jest uses to detect test files testMatch: [ - "**/test/*_test.js" + '**/test/*_test.js', ], // An array of regexp pattern strings that are matched against all test paths, matched tests are skipped testPathIgnorePatterns: [ - "/node_modules/" + '/node_modules/', ], // The regexp pattern or array of patterns that Jest uses to detect test files diff --git a/test/test_helper.ts b/test/test_helper.ts index 395ff35..f459211 100644 --- a/test/test_helper.ts +++ b/test/test_helper.ts @@ -4,7 +4,7 @@ import { readFileSync } from 'fs'; import timekeeper from 'timekeeper'; import winston, { LoggerInstance } from 'winston'; -export const sslFilePath = (filename: string) => (__dirname + '/../test/support/ssl/' + filename) +export const sslFilePath = (filename: string) => (__dirname + '/support/ssl/' + filename) const freezedTime = new Date(1330688329321); const port = 28777;