Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge a6ffa75 into c76b680
Browse files Browse the repository at this point in the history
  • Loading branch information
behrad committed Apr 14, 2016
2 parents c76b680 + a6ffa75 commit 17e209f
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 16 deletions.
9 changes: 3 additions & 6 deletions lib/persistence/levelup.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ LevelUpPersistence.prototype.lookupRetained = function(pattern, cb) {
});

stream.on("data", function(data) {
if (matcher.match(data.key).length > 0) {
if (matcher.match(data.key).size > 0) {
matched.push(data.value);
}
});
Expand All @@ -176,9 +176,7 @@ LevelUpPersistence.prototype.storeSubscriptions = function(client, done) {
qos: subscriptions[key].qos
};
var levelKey = util.format("%s:%s", key, client.id);
if (that._subMatcher.match(key).indexOf(levelKey) < 0) {
that._subMatcher.add(key, levelKey);
}
that._subMatcher.add(key, levelKey);
that._subscriptions.put(levelKey, sub);
});
} else if (done) {
Expand Down Expand Up @@ -232,8 +230,7 @@ LevelUpPersistence.prototype.lookupSubscriptions = function(client, done) {
LevelUpPersistence.prototype.storeOfflinePacket = function(packet, done) {
var that = this;
var subs = this._subMatcher.match(packet.topic);

async.each(subs, function(key, cb) {
async.each(Array.from(subs), function(key, cb) {
that._subscriptions.get(key, function(err, sub) {
if (err) {
return cb(err);
Expand Down
2 changes: 1 addition & 1 deletion lib/persistence/matcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
*/

var Qlobber = require("qlobber").Qlobber;
var Qlobber = require("qlobber").QlobberDedup;
var util = require("util");

function Matcher() {
Expand Down
2 changes: 1 addition & 1 deletion lib/persistence/mongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ MongoPersistence.prototype.lookupRetained = function(pattern, cb) {
});

stream.on("data", function(data) {
if (matcher.match(data.topic).length > 0) {
if (matcher.match(data.topic).size > 0) {
data.payload = data.payload.buffer;
matched.push(data);
}
Expand Down
12 changes: 4 additions & 8 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ function RedisPersistence(options, callback) {
}

Object.keys(subs).forEach(function(sub) {
if (that._subMatcher.match(sub).indexOf(id) < 0) {
that._subMatcher.add(sub, id);
}
that._subMatcher.add(sub, id);
});

if( unsubs ) {
Expand Down Expand Up @@ -229,7 +227,7 @@ RedisPersistence.prototype.lookupRetained = function(pattern, done) {
this._client.hkeys("retained", function(err, topics) {
topics.sort();
topics = topics.filter(function(topic) {
return matcher.match(topic).length > 0;
return matcher.match(topic).size > 0;
});

async.each(topics, match, function(err) {
Expand Down Expand Up @@ -286,9 +284,7 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) {
.pexpire(clientSubKey, this.options.ttl.subscriptions);

Object.keys(subscriptions).forEach(function(e) {
if (that._subMatcher.match(e).indexOf(client.id) < 0) {
that._subMatcher.add(e, client.id);
}
that._subMatcher.add(e, client.id);
});

op.exec(cb);
Expand Down Expand Up @@ -353,7 +349,7 @@ RedisPersistence.prototype.storeOfflinePacket = function(packet, done) {
var that = this;

var matches = this._subMatcher.match(packet.topic);
async.each(matches, function(client, cb) {
async.each(Array.from(matches), function(client, cb) {
that._storePacket(client, packet, cb);
}, done);
};
Expand Down

0 comments on commit 17e209f

Please sign in to comment.