Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Empty retained messages should delete the record.
Browse files Browse the repository at this point in the history
Closes #98.
  • Loading branch information
mcollina committed Apr 17, 2014
1 parent 95c1275 commit 86924fe
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 20 deletions.
6 changes: 5 additions & 1 deletion lib/persistence/levelup.js
Expand Up @@ -106,7 +106,11 @@ util.inherits(LevelUpPersistence, AbstractPersistence);
*/

LevelUpPersistence.prototype.storeRetained = function(packet, cb) {
this._retained.put(packet.topic, packet, cb);
if (packet.payload.length > 0) {
this._retained.put(packet.topic, packet, cb);
} else {
this._retained.del(packet.topic, cb);
}
};

LevelUpPersistence.prototype.lookupRetained = function(pattern, cb) {
Expand Down
41 changes: 23 additions & 18 deletions lib/persistence/mongo.js
Expand Up @@ -194,24 +194,29 @@ MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
};

MongoPersistence.prototype.storeRetained = function(packet, cb) {

this._retained.findAndModify(
{ topic: packet.topic },
[],
packet,
{
upsert: true,
new: true
},
function(err, result){
if(!err) {
packet._id = result._id;
}
if(cb) {
return cb(err);
}
});

if (packet.payload.length > 0) {
this._retained.findAndModify(
{ topic: packet.topic },
[],
packet,
{
upsert: true,
new: true
},
function(err, result){
if(!err) {
packet._id = result._id;
}
if(cb) {
return cb(err);
}
});
} else {
this._retained.remove(
{ topic: packet.topic },
{ w: 1 },
cb);
}
};

MongoPersistence.prototype.lookupRetained = function(pattern, cb) {
Expand Down
6 changes: 5 additions & 1 deletion lib/persistence/redis.js
Expand Up @@ -151,7 +151,11 @@ RedisPersistence.prototype._buildClient = function() {
};

RedisPersistence.prototype.storeRetained = function(packet, cb) {
this._client.hset("retained", packet.topic, JSON.stringify(packet), cb);
if (packet.payload.length > 0) {
this._client.hset("retained", packet.topic, JSON.stringify(packet), cb);
} else {
this._client.hdel("retained", packet.topic, cb);
}
};

RedisPersistence.prototype.lookupRetained = function(pattern, done) {
Expand Down
31 changes: 31 additions & 0 deletions test/persistence/abstract.js
Expand Up @@ -149,6 +149,37 @@ module.exports = function(create, buildOpts) {
], done);
});

it("should remove a retained message if the payload is empty", function(done) {
var packet = {
topic: "hello",
qos: 0,
payload: new Buffer("world"),
messageId: 42,
retain: true
};

var packet2 = {
topic: "hello",
qos: 0,
payload: new Buffer(0),
messageId: 43,
retain: true
};

var instance = this.instance;

async.series([
instance.storeRetained.bind(instance, packet),
instance.storeRetained.bind(instance, packet2),
function(cb) {
instance.lookupRetained("hello", function(err, results) {
expect(results).to.have.property("length", 0);
cb();
});
}
], done);
});

it("should match and load with a 'some' pattern", function(done) {
var packet1 = {
topic: "hello/1",
Expand Down

0 comments on commit 86924fe

Please sign in to comment.