Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #545 from happybin92/master
Browse files Browse the repository at this point in the history
Fixbug (subscribes with same topic, may lead to the ascoltatore added multi cbs, and can't unsubscribe)
  • Loading branch information
mcollina committed Sep 28, 2016
2 parents d284a97 + e230f86 commit a0a068f
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 12 deletions.
30 changes: 18 additions & 12 deletions lib/client.js
Expand Up @@ -443,19 +443,25 @@ Client.prototype.handleAuthorizeSubscribe = function(err, success, s, cb) {
that.forward(topic, payload, options, s.topic, s.qos);
};

this.server.ascoltatore.subscribe(
s.topic,
handler,
function(err) {
if (err) {
cb(err);
return;
if (this.subscriptions[s.topic] === undefined) {
this.subscriptions[s.topic] = { qos: s.qos, handler: handler };
this.server.ascoltatore.subscribe(
s.topic,
handler,
function(err) {
if (err) {
delete that.subscriptions[s.topic];
cb(err);
return;
}
that.logger.info({ topic: s.topic, qos: s.qos }, "subscribed to topic");
//that.subscriptions[s.topic] = { qos: s.qos, handler: handler };
cb(null, true);
}
that.logger.info({ topic: s.topic, qos: s.qos }, "subscribed to topic");
that.subscriptions[s.topic] = { qos: s.qos, handler: handler };
cb(null, true);
}
);
);
} else {
cb(null, true);
}
};

function handleEachSub (s, cb) {
Expand Down
55 changes: 55 additions & 0 deletions test/server.js
Expand Up @@ -169,6 +169,61 @@ describe("mosca.Server", function() {
});
});

it("should not receive the publish after unsubscription, while multi subscriptions with the same topic", function(done) {

// Simulate a situation that it takes same time to do authorizeSubscribe.
this.instance.authorizeSubscribe = function(client, topic, callback) {
setTimeout(function(){
callback(null, true)
}, 300);
};

buildAndConnect(function(){}, this.instance, function(client) {
function subAction(){
var messageId = Math.floor(65535 * Math.random());
client.subscribe({
subscriptions: [{topic: "hello", qos: 1 }],
messageId: messageId
});
}

var subCount = 3; // subscribe the same topic for 3 times
for (var i = 0; i < subCount; ++i)
subAction();

var subackCount = 0;
client.on("suback", function() { // unsubscribe after subscriptions
subackCount++;
if (subackCount == subCount) {
var messageId = Math.floor(65535 * Math.random());
client.unsubscribe({
unsubscriptions: ["hello"],
messageId: messageId
});
}
});

client.on("unsuback", function() { // publish message after unsubscription
var messageId = Math.floor(65535 * Math.random());
client.publish({
topic: "hello",
payload: "some data",
messageId: messageId,
qos: 1
});
});

client.on("publish", function(packet) { // should not receive the publish
done(new Error("unexpected publish"));
});

client.on("puback", function(packet) { // close client when puback
client.disconnect();
done();
});
});
});

it("should fail if persistence can not connect", function (done) {
var newSettings = moscaSettings();

Expand Down

0 comments on commit a0a068f

Please sign in to comment.