Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Removing subscriptions during lookupSubscriptions
Browse files Browse the repository at this point in the history
Otherwise we will queue offline messages when they are not due.
  • Loading branch information
mcollina committed Dec 14, 2013
1 parent 07ccdae commit 1cf36c5
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 31 deletions.
32 changes: 15 additions & 17 deletions lib/persistence/levelup.js
Expand Up @@ -140,30 +140,28 @@ var nop = function() {};
LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) {
var that = this;
this._clientSubscriptions.get(client.id, function(err, subscriptions) {
if (subscriptions && client.clean) {
that._clientSubscriptions.del(client.id, function() {
that._clientSubscriptions.del(client.id, function() {
if (subscriptions && client.clean) {
that.streamOfflinePackets(client, nop, function() {

Object.keys(subscriptions).forEach(function(key) {
that._subscriptions.batch(Object.keys(subscriptions).map(function(key) {
var levelKey = util.format("%s:%s", key, client.id);
that._subLobber.remove(key, levelKey);
that._subscriptions.del(levelKey);
return {
key: levelKey,
type: 'del'
};
}), function(err) {
done(err, {});
});

if (done) {
done(null, {});
}
});
});
} else {
if (!subscriptions) {
subscriptions = {};
}
} else {
subscriptions = subscriptions || {};

if (done) {
done(null, subscriptions);
if (done) {
done(null, subscriptions);
}
}
}
});
});
};

Expand Down
32 changes: 20 additions & 12 deletions lib/persistence/mongo.js
Expand Up @@ -134,19 +134,27 @@ MongoPersistence.prototype.storeSubscriptions = function(client, done) {

MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
var that = this;
if (client.clean) {
async.parallel([
this._subscriptions.remove.bind(this._subscriptions, { client: client.id }),
this._packets.remove.bind(this._packets, { client: client.id }),
], function(err) {
done(err, {});
});
} else {
this._subscriptions.find({ client: client.id })
.toArray(function(err, subscriptions) {
this._subscriptions.find({ client: client.id })
.toArray(function(err, subscriptions) {

var toExecute = [
function removeSubscriptions(cb) {
that._subscriptions.remove({ client: client.id }, cb);
}
];

if (client.clean) {
subscriptions = [];
toExecute.unshift(function removePackets(cb) {
that._packets.remove({ client: client.id }, cb);
});
}

subscriptions = subscriptions || [];

async.parallel(toExecute, function(err) {
var now = Date.now();
done(err, (subscriptions || []).reduce(function(obj, sub) {
done(err, subscriptions.reduce(function(obj, sub) {
// mongodb TTL is not precise
if (sub.added.getTime() + that.options.ttl.subscriptions > now) {
obj[sub.topic] = {
Expand All @@ -156,7 +164,7 @@ MongoPersistence.prototype.lookupSubscriptions = function(client, done) {
return obj;
}, {}));
});
}
});
};

MongoPersistence.prototype.storeRetained = function(packet, cb) {
Expand Down
15 changes: 13 additions & 2 deletions lib/persistence/redis.js
Expand Up @@ -240,8 +240,19 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) {
return;
}

this._client.get("client:sub:" + client.id, function(err, result) {
cb(err, JSON.parse(result) || {});
var key = "client:sub:" + client.id;
var subscriptions;

var multi = this._client.multi()

multi.get(key, function(err, result) {
subscriptions = JSON.parse(result) || {};
});

multi.del(key);

multi.exec(function(err) {
cb(err, subscriptions);
});
};

Expand Down
22 changes: 22 additions & 0 deletions test/persistence/abstract.js
Expand Up @@ -310,6 +310,28 @@ module.exports = function(create, buildOpts) {
});
});

it("should remove the subscriptions after lookup", function(done) {
var instance = this.instance;
var client = {
id: "my client id - 42",
logger: globalLogger,
subscriptions: {
hello: {
qos: 1
}
}
};

instance.storeSubscriptions(client, function() {
instance.lookupSubscriptions(client, function() {
instance.lookupSubscriptions(client, function(err, results) {
expect(results).to.eql({});
done();
});
});
});
});

it("should allow a clean client to connect", function(done) {
var instance = this.instance;
var client = {
Expand Down

0 comments on commit 1cf36c5

Please sign in to comment.