Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #642 from btsimonh/btsimonh-qos-2-mk2
Browse files Browse the repository at this point in the history
Add option 'qos2Puback' -easy merge
  • Loading branch information
mcollina committed May 19, 2017
2 parents c4303c5 + 32f0714 commit 19b6d25
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 4 deletions.
1 change: 1 addition & 0 deletions examples/Server_With_All_Interfaces-Settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var moscaSetting = {
{ type: "https", port: 3001, bundle: true, credentials: { keyPath: SECURE_KEY, certPath: SECURE_CERT } }
],
stats: false,
onQoS2publish: 'noack', // can set to 'disconnect', or to 'dropToQoS1' if using a client which will eat puback for QOS 2; e.g. mqtt.js

logger: { name: 'MoscaServer', level: 'debug' },

Expand Down
25 changes: 23 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -535,15 +535,35 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) {
packet.payload = success;
}

// Mosca does not support QoS2
// if onQoS2publish === 'dropToQoS1', don't just ignore QoS2 message, puback it
// by converting internally to qos 1.
// this fools mqtt.js into not holding all messages forever
// if onQoS2publish === 'disconnect', then break the client connection if QoS2
if (packet.qos === 2){
switch(that.server.onQoS2publish){
case 'dropToQoS1':
packet.qos = 1;
break;
case 'disconnect':
if (!this._closed && !this._closing) {
that.close(null, "qos2 caused disconnect");
}
return;
break;
default:
break;
}
}

var dopuback = function() {
if (packet.qos === 1 && !(that._closed || that._closing)) {
that.connection.puback({
messageId: packet.messageId
});
}
};



// if success is passed as 'ignore', ack but don't publish.
if (success !== 'ignore'){
// publish message
Expand All @@ -552,6 +572,7 @@ Client.prototype.handleAuthorizePublish = function(err, success, packet) {
// ignore but acknowledge message
dopuback();
}

};

/**
Expand Down
8 changes: 6 additions & 2 deletions lib/options.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ function modernize(legacy) {
"stats",
"publishNewClient",
"publishClientDisconnect",
"publishSubscriptions"
"publishSubscriptions",
"onQoS2publish"
];

// copy all conserved options
Expand Down Expand Up @@ -252,7 +253,8 @@ function validate(opts, validationOptions) {
'stats': { type: 'boolean' },
'publishNewClient': { type: 'boolean' },
'publishClientDisconnect': { type: 'boolean' },
'publishSubscriptions': { type: 'boolean' }
'publishSubscriptions': { type: 'boolean' },
'onQoS2publish': { type: 'string' },
}
});

Expand Down Expand Up @@ -330,6 +332,7 @@ function defaultsLegacy() {
publishClientDisconnect: true,
publishSubscriptions: true,
maxInflightMessages: 1024,
onQoS2publish: 'noack',
logger: {
name: "mosca",
level: "warn",
Expand Down Expand Up @@ -366,6 +369,7 @@ function defaultsModern() {
publishClientDisconnect: true,
publishSubscriptions: true,
maxInflightMessages: 1024,
onQoS2publish: 'noack',
logger: {
name: "mosca",
level: "warn",
Expand Down
3 changes: 3 additions & 0 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ function Server(opts, callback) {

var that = this;

// put QOS-2 spoofing as a variable direct on server
this.onQoS2publish = this.modernOpts.onQoS2publish;

// each Server has a dummy id for logging purposes
this.id = this.modernOpts.id || shortid.generate();

Expand Down
133 changes: 133 additions & 0 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,139 @@ module.exports = function(moscaSettings, createConnection) {
});
});

it("should by default not puback client publish to QOS 2", function(done) {
var onPublishedCalled = false;
var clientId;
var count = 0;
var timer;

instance.published = function(packet, serverClient, callback) {
onPublishedCalled = true;
expect(packet.topic).to.be.equal("testQOS2");
callback();
};

buildAndConnect(done, function(client) {
clientId = client.opts.clientId;

client.publish({
messageId: 42,
topic: "testQOS2",
payload: "publish expected",
qos: 2
});

// allow 1 second to hear puback
timer = setTimeout(function(){
client.disconnect();
}, 1000);

// default QOS 2 should NOT puback
client.on("puback", function() {
count++;
//expect(count).to.eql(1);
client.disconnect();
});
client.on("close", function() {
expect(count).to.eql(0);
client.disconnect();
clearTimeout(timer);
});
});
});


it("should optionally (onQoS2publish='dropToQoS1') puback client publish to QOS 2", function(done) {
var onPublishedCalled = false;
var clientId;
var count = 0;
var timer;

instance.onQoS2publish = 'dropToQoS1';
instance.published = function(packet, serverClient, callback) {
onPublishedCalled = true;
expect(packet.topic).to.be.equal("testQOS2");
callback();
};

buildAndConnect(done, function(client) {
clientId = client.opts.clientId;

client.publish({
messageId: 42,
topic: "testQOS2",
payload: "publish expected",
qos: 2
});

// allow 1 second to hear puback
timer = setTimeout(function(){
client.disconnect();
}, 1000);

// with maxqos=1, QOS 2 should puback
client.on("puback", function() {
count++;
expect(count).to.eql(1);
client.disconnect();
});
client.on("close", function() {
expect(count).to.eql(1);
client.disconnect();
clearTimeout(timer);
});
});
});

it("should optionally (onQoS2publish='disconnect') disconnect client on publish of QOS2 message", function(done) {
var onPublishedCalled = false;
var clientId;
var count = 0;
var timer;

instance.onQoS2publish = 'disconnect';
instance.published = function(packet, serverClient, callback) {
onPublishedCalled = true;
expect(packet.topic).to.be.equal("should not have published");
callback();
};

buildAndConnect(done, function(client) {
clientId = client.opts.clientId;

client.publish({
messageId: 42,
topic: "QOS2Test",
payload: "some data to cause close",
qos: 2
});

// if after 2 seconds, we've not closed
timer = setTimeout(function(){
var test = false;
expect(count).to.eql(0);
expect(test).to.eql(true);
client.disconnect();
}, 2000);

// onQoS2publish = 'disconnect' should NOT puback
client.on("puback", function() {
expect(onPublishedCalled).to.eql(false);
count++;
expect(count).to.eql(0);
client.disconnect();
});
client.on("close", function() {
expect(onPublishedCalled).to.eql(false);
expect(count).to.eql(0);
client.disconnect();
clearTimeout(timer);
});
});
});



it("should emit an event when a new client is connected", function(done) {
buildClient(done, function(client) {

Expand Down

0 comments on commit 19b6d25

Please sign in to comment.