Skip to content

Commit

Permalink
connection: use ping-pong instead of heartbeat
Browse files Browse the repository at this point in the history
* Send Ping-Pong instead of Heartbeat messages.
* Use setInterval() instead of timers.enroll().
* Rewrite tests.

Fixes: #217
PR-URL: #303
Reviewed-By: Alexey Orlenko <eaglexrlnk@gmail.com>
Reviewed-By: Mykola Bilochub <nbelochub@gmail.com>
  • Loading branch information
nechaido committed Nov 24, 2017
1 parent b1b59f7 commit 37918fe
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 55 deletions.
64 changes: 19 additions & 45 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const { EventEmitter } = require('events');
const semver = require('semver');
const timers = require('timers');

const common = require('./common');
const serde = require('./serde');
Expand Down Expand Up @@ -53,18 +52,15 @@ class Connection extends EventEmitter {
this.application = null;
this.remoteProxies = {};

this._heartbeatCallbackInstance = null;
this._heartbeatTimer = null;
this._closed = false;

// Defined in constructor to be used as default callback in callMethod
// without binding it.
this._emitError = (error) => {
if (error) this.emit('error', error);
};

// Defined in constructor to be used as heartbeat message
// in debug mode events
this._heartbeatMessage = {};

transport.on('message', this._processMessage.bind(this));
transport.on('close', this._onSocketClose.bind(this));
transport.on('error', this._onSocketError.bind(this));
Expand Down Expand Up @@ -240,34 +236,33 @@ class Connection extends EventEmitter {

// Send a pong message
//
pong(messageId) {
_pong(messageId) {
if (this._closed) {
return;
}
const message = { pong: [messageId] };
this._send(this._prepareMessage(message), message);
}

// Start sending heartbeat messages
// Start sending ping messages
// interval - heartbeat interval in milliseconds
//
startHeartbeat(interval) {
const callback = () => {
this.transport.send('{}');
this.setTimeout(interval, this._heartbeatCallbackInstance);

if (process.env.NODE_ENV !== 'production') {
this.emit('heartbeat', this._heartbeatMessage);
const heartbeat = () => {
if (!this._closed) {
this.ping();
}
};

this._heartbeatCallbackInstance = callback;
callback();
this._heartbeatTimer = setInterval(heartbeat, interval);
}

// Stop sending heartbeat messages
// Stop sending ping messages
//
stopHeartbeat() {
if (this._heartbeatCallbackInstance) {
this.clearTimeout(this._heartbeatCallbackInstance);
this._heartbeatCallbackInstance = null;
if (this._heartbeatTimer) {
clearTimeout(this._heartbeatTimer);
this._heartbeatTimer = null;
}
}

Expand Down Expand Up @@ -320,40 +315,17 @@ class Connection extends EventEmitter {
// Close the connection
//
close() {
this._closed = true;
this.stopHeartbeat();
this.transport.end();
}

// Set a timeout using timers.enroll()
// milliseconds - amount of milliseconds
// callback - callback function
//
setTimeout(milliseconds, callback) {
timers.enroll(this, milliseconds);
timers._unrefActive(this);
this.once('_timeout', callback);
}

// Clear a timeout set with Connection#setTimeout
// handler - timer callback to remove
//
clearTimeout(handler) {
timers.unenroll(this);
this.removeListener('_timeout', handler);
}

// Returns underlying transport
//
getTransport() {
return this.transport.getRawTransport();
}

// timers.enroll() timeout handler
//
_onTimeout() {
this.emit('_timeout');
}

// Prepare a JSTP message to be sent over this connection
// message - a message to prepare
//
Expand Down Expand Up @@ -383,6 +355,7 @@ class Connection extends EventEmitter {
// message - a message to send (optional)
//
_end(message) {
this._closed = true;
this.stopHeartbeat();

if (message) {
Expand All @@ -400,6 +373,7 @@ class Connection extends EventEmitter {
// Closed socket event handler
//
_onSocketClose() {
this._closed = true;
this.stopHeartbeat();
this.emit('close', this);
if (this.server) {
Expand Down Expand Up @@ -705,7 +679,7 @@ class Connection extends EventEmitter {
//
_processPingMessage(message) {
const messageId = message.ping[0];
this.pong(messageId);
this._pong(messageId);
this.session._onMessageRecieved(messageId);
}

Expand Down
4 changes: 2 additions & 2 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ class Server {
}
};

connection.setTimeout(HANDSHAKE_TIMEOUT, handleTimeout);
const handshakeTimeout = setTimeout(handleTimeout, HANDSHAKE_TIMEOUT);

connection.on('client', () => {
connection.clearTimeout(handleTimeout);
clearTimeout(handshakeTimeout);
if (this.heartbeatInterval) {
connection.startHeartbeat(this.heartbeatInterval);
}
Expand Down
35 changes: 29 additions & 6 deletions test/node/connection-emit-actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ const jstp = require('../..');

const app = require('../fixtures/application');

const HEARTBEAT_INTERVAL = 100;

const application = new jstp.Application(app.name, app.interfaces);
const serverConfig = {
applications: [application],
authPolicy: app.authCallback,
heartbeatInterval: HEARTBEAT_INTERVAL,
};

let server;
Expand Down Expand Up @@ -191,14 +194,34 @@ test.test('must emit messages in development mode', (test) => {
});

test.test('must emit heartbeat messages in development mode', (test) => {
test.plan(2);
test.plan(4);
const received = {
serverPing: false,
serverPong: false,
clientPing: false,
clientPong: false,
};

server.getClientsArray()[0].on('heartbeat', (message) => {
test.strictSame(message, {}, 'heartbeat message must match on server side');
server.getClientsArray()[0].on('incomingMessage', (message) => {
if (message.ping !== undefined) {
received.serverPing = true;
} else if (message.pong !== undefined) {
received.serverPong = true;
}
});
connection.on('heartbeat', (message) => {
test.strictSame(message, {}, 'heartbeat message must match on client side');
connection.on('incomingMessage', (message) => {
if (message.ping !== undefined) {
received.clientPing = true;
} else if (message.pong !== undefined) {
received.clientPong = true;
}
});

connection.startHeartbeat(100);
connection.startHeartbeat(HEARTBEAT_INTERVAL);
setTimeout(() => {
test.assert(received.serverPing, 'server must receive ping message');
test.assert(received.serverPong, 'server must receive pong message');
test.assert(received.clientPing, 'client must receive ping message');
test.assert(received.clientPing, 'client must receive pong message');
}, 2 * HEARTBEAT_INTERVAL);
});
6 changes: 4 additions & 2 deletions test/node/regress-gh-283.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ server.listen(() => {

let heartbeatsCount = 0;

connection.on('heartbeat', () => {
heartbeatsCount++;
connection.on('incomingMessage', (message) => {
if (message.ping !== undefined) {
heartbeatsCount++;
}
});

setTimeout(() => {
Expand Down

0 comments on commit 37918fe

Please sign in to comment.