Skip to content
This repository has been archived by the owner on Apr 19, 2023. It is now read-only.

Commit

Permalink
Implement heartbeat-based EventSource connection reset (#299)
Browse files Browse the repository at this point in the history
This should address #156 by creating a heartbeat timer that will reset the SSE connection to particle if a certain number of messages are not received every heartbeat interval.
  • Loading branch information
suyashkumar committed Aug 23, 2018
1 parent 01bc4ba commit 192b59c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 23 deletions.
50 changes: 36 additions & 14 deletions software/cloud-dashboard/node-server/handle-device.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,66 @@
* notifications to clients using socketio when new packages arrive.
* @author: Suyash Kumar <suyashkumar2003@gmail.com>
*/
var EventSource = require('eventsource'); // Pull in event source
var MessageUtil = require('./handle-device/message-util');
let EventSource = require('eventsource'); // Pull in event source
let MessageUtil = require('./handle-device/message-util');

var EventSourceRestartTime = 10000; // in ms
let EventSourceRestartTime = 10000; // in ms

var es; // Instance of EventSource
let es; // Global instance of EventSource

let receivedSinceLastHeartbeat = 0;
const heartbeatInterval = 300000; // in ms
const heartbeatMessageCutoff = 1; // minimum messages that must be received per heartbeatInterval

function handleDataMessage(message, io) {
try {
const parsedData = MessageUtil.parseMessage(message);
MessageUtil.addRecords(parsedData, io);
receivedSinceLastHeartbeat++;
} catch (err) {
console.log("ERROR: Parsing DATA Message", message, err);
}
}

function handleStream(deviceUrl, io){
function checkReceivedMessagesHeartbeat(deviceUrl, io) {
if (receivedSinceLastHeartbeat < heartbeatMessageCutoff) {
es.close();
setTimeout(handleStream, EventSourceRestartTime, deviceUrl, io);
console.log("ERROR: Received messages since last heartbeat below threshold, restarting EventSource.");
return;
}

// Reset receivedSinceLastHearbeat and schedule another call to checkReceivedMessagesHeartbeat
receivedSinceLastHeartbeat = 0;
setTimeout(checkReceivedMessagesHeartbeat, heartbeatInterval, deviceUrl, io);
}

function handleStream(deviceUrl, io) {
es = new EventSource(deviceUrl); // Listen to the stream

// Add temperature event listener (DATA event on devices with latest firmware)
es.addEventListener("TEMPS", (message) => {
es.addEventListener("TEMPS", message => {
handleDataMessage(message, io);
});

// Add new DATA event listener
es.addEventListener("DATA", (message) => {
es.addEventListener("DATA", message => {
handleDataMessage(message, io);
});

es.onerror = function (err) {
console.log("ERROR (Likely Event Source)");
console.log(err);
es.close()
setTimeout(handleStream, EventSourceRestartTime, deviceUrl, io); // Manual reinit of eventsource in error cases
};
es.onerror = err => {
console.log("ERROR (Likely Event Source)");
console.log(err);
es.close();
setTimeout(handleStream, EventSourceRestartTime, deviceUrl, io); // Manual reinit of eventsource in error cases
};

// Schedule the initial call to checkReceivedMessagesHeartbeat
setTimeout(checkReceivedMessagesHeartbeat, heartbeatInterval, deviceUrl, io);

}

module.exports = {
handleStream, // Export handleDevice function
handleDataMessage,
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,4 @@ function parseMessage(message) {
module.exports = {
parseMessage,
addRecords
}
};
4 changes: 2 additions & 2 deletions software/cloud-dashboard/node-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"twilio": "^2.9.1"
},
"devDependencies": {
"jest": "^18.1.0",
"sinon": "^1.17.7"
"jest": "^18.1.0",
"sinon": "^6.1.4"
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
var MessageUtil = require('../../handle-device/message-util');
let testData = require('./test-data');
let sinon = require('sinon');

var handleDevice = require('../../handle-device');
var testData = require('./test-data');
var sinon = require('sinon');

describe('Temperature Event Publish', () => {
describe('Temperature Event Publish', () => {
let MessageUtil = require('../../handle-device/message-util');
let handleDevice = require('../../handle-device');

const parseMessageStub = sinon.stub(MessageUtil, 'parseMessage');
const addRecordsStub = sinon.stub(MessageUtil, 'addRecords');
handleDevice.handleDataMessage(testData.sampleTempEvent, {});
Expand All @@ -15,5 +16,7 @@ describe('Temperature Event Publish', () => {
it('called addRecords', () => {
expect(addRecordsStub.called).toBe(true);
});
sinon.restore();
});


Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
var testData = require('./test-data');
var sinon = require('sinon');

var MessageUtil = require('../../handle-device/message-util');

Expand Down

0 comments on commit 192b59c

Please sign in to comment.