Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
Merge 79a6a75 into a3589b5
Browse files Browse the repository at this point in the history
  • Loading branch information
oshoemaker committed Mar 18, 2015
2 parents a3589b5 + 79a6a75 commit 4a241cf
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 4 deletions.
32 changes: 28 additions & 4 deletions lib/mqtt_ascoltatore.js
Expand Up @@ -6,6 +6,7 @@ var TrieAscoltatore = require("./trie_ascoltatore");
var AbstractAscoltatore = require('./abstract_ascoltatore');
var debug = require("debug")("ascoltatori:mqtt");
var SubsCounter = require("./subs_counter");
var async = require("async");

/**
* MQTTAscoltatore is a class that inherits from AbstractAscoltatore.
Expand Down Expand Up @@ -50,9 +51,10 @@ MQTTAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype);
* @api private
*/
MQTTAscoltatore.prototype._startConn = function() {
var that = this,
url = that._opts.url,
settings = null;
var that = this;
var url = that._opts.url;
var settings = null;

if (this._client === undefined) {
debug("connecting..");
if (url) {
Expand All @@ -64,7 +66,9 @@ MQTTAscoltatore.prototype._startConn = function() {
this._client.setMaxListeners(0);
this._client.on("connect", function() {
debug("connected");
that.emit("ready");
that.reconnectTopics(function(){
that.emit("ready");
});
});
this._client.on("message", function(topic, payload, packet) {
debug("received new packet on topic " + topic);
Expand All @@ -84,6 +88,26 @@ MQTTAscoltatore.prototype._startConn = function() {
return this._client;
};

MQTTAscoltatore.prototype.reconnectTopics = function reconnectTopics(cb) {
var that = this;

var subscribedTopics = that._subs_counter.keys();

var opts = {
qos: 1
};

async.each(subscribedTopics, function(topic, callback) {
that._client.subscribe(topic, opts, function() {
debug("re-registered subscriber for topic " + topic);
callback();
});
}, function(){
cb();
});

};

MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) {
this._raiseIfClosed();

Expand Down
71 changes: 71 additions & 0 deletions test/mqtt_ascoltatore_spec.js
@@ -1,3 +1,4 @@

describeAscoltatore("MQTT", function() {

afterEach(function(done) {
Expand Down Expand Up @@ -38,4 +39,74 @@ describeAscoltatore("MQTT", function() {
});
that.instance.publish("hello/123", "42", { qos: 0 });
});

});

describe("MQTT Reconnect Test", function() {
it("should re-subscribe to topics", function(done) {
this.timeout(3000); // Set the test timeout to 3s

var that = this;
var mosca = require("mosca");
var msgReceived = false;

var moscaOpts = {
port: 6884,
stats: false,
logger: {
level: "fatal"
}
};

var clientOpts = {
json: false,
mqtt: require("mqtt"),
host: "127.0.0.1",
port: 6884
};

var mqttTestServer = new mosca.Server(moscaOpts);
var newClient = new ascoltatori.MQTTAscoltatore(clientOpts);

async.series([
function(cb) {
newClient.once('ready',cb);
},

function(cb) {
// Subscribe to topic for test
newClient.subscribe('reconnect/test', function() {
newClient.emit('success');
}, cb);
},

// Stop the MQTT server
function(cb) {
mqttTestServer.close(cb);
},

// Start the MQTT server
function(cb) {
mqttTestServer = new mosca.Server(moscaOpts, cb);
},

// Setup listener and send message
function(cb) {
newClient.once('success', function() {
msgReceived = true;
cb();
});

newClient.once('ready', function(){
newClient.publish('reconnect/test', 'blah');
});
},

], function() {
if (msgReceived) {
done();
}
});

});
});

0 comments on commit 4a241cf

Please sign in to comment.