Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #518 from adpdigital/fixLoadSubs
Browse files Browse the repository at this point in the history
fix loadSubs callback issue & use pipeline to get subscriptions
  • Loading branch information
mcollina committed Jul 11, 2016
2 parents 474da44 + 7b79508 commit 6e566d0
Showing 1 changed file with 33 additions and 12 deletions.
45 changes: 33 additions & 12 deletions lib/persistence/redis.js
Expand Up @@ -98,6 +98,8 @@ function RedisPersistence(options, callback) {
if (!result || typeof subs !== 'object') {
if (!retried) {
setTimeout(fetchAndUpdateLocalSub.bind(null, key, unsubs, true, cb), 500);
} else {
cb && cb();
}
return;
}
Expand Down Expand Up @@ -132,24 +134,43 @@ function RedisPersistence(options, callback) {
return;
}
var subsStream = that._client.scanStream({
match: "client:sub:*"
match: "client:sub:*",
count: 25000
});
var keys = [];
var pipeline = that._client.pipeline();
var total = 0;
var done = null;

subsStream.on('data', function(moreKeys){
for( var i=0; i<moreKeys.length; i++) {
keys.push(moreKeys[i]);
}
total += moreKeys.length;
moreKeys.map(function(k){
pipeline.get(k, function(err, result) {
if (err) {
done && done(err);
return;
}
var subs = JSON.parse(result);
if (!result || typeof subs !== 'object') {
done && done();
return;
}
updateLocalSub(k, subs);
done && done();
});
});
});

subsStream.on('end', function(){
steed.each(keys, function(k,next){
fetchAndUpdateLocalSub(k,null,false,next);
}, function(err) {
if (callback) {
callback(err, that);
if (total === 0) {
return callback(null, that);
}
done = function() {
if (--total === 0 && callback) {
callback(null, that);
callback = null;
return;
}
});
};
pipeline.exec();
});
});

Expand Down

0 comments on commit 6e566d0

Please sign in to comment.