Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions lib/databases/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function Redis(options) {
util.inherits(Redis, Store);

// helpers
function handleResultSet (err, res, callback) {
function handleResultSet(err, res, callback) {
if (err) {
debug(err);
return callback(err);
Expand All @@ -63,7 +63,7 @@ function handleResultSet (err, res, callback) {
}
var arr = [];

res.forEach(function(item) {
res.forEach(function (item) {
arr.push(jsondate.parse(item));
});

Expand Down Expand Up @@ -145,7 +145,7 @@ _.extend(Redis.prototype, {
this.heartbeatInterval = setInterval(function () {
var graceTimer = setTimeout(function () {
if (self.heartbeatInterval) {
console.error((new Error ('Heartbeat timeouted after ' + gracePeriod + 'ms (redis)')).stack);
console.error((new Error('Heartbeat timeouted after ' + gracePeriod + 'ms (redis)')).stack);
self.disconnect();
}
}, gracePeriod);
Expand Down Expand Up @@ -177,7 +177,7 @@ _.extend(Redis.prototype, {
self.client.del('nextItemId:' + self.options.prefix, callback);
},
function (callback) {
self.client.keys(self.options.prefix + ':*', function(err, keys) {
self.client.keys(self.options.prefix + ':*', function (err, keys) {
if (err) {
return callback(err);
}
Expand All @@ -194,8 +194,8 @@ _.extend(Redis.prototype, {
});
},

getNewId: function(callback) {
this.client.incr('nextItemId:' + this.options.prefix, function(err, id) {
getNewId: function (callback) {
this.client.incr('nextItemId:' + this.options.prefix, function (err, id) {
if (err) {
debug(err);
return callback(err);
Expand Down Expand Up @@ -251,13 +251,13 @@ _.extend(Redis.prototype, {
cursor = 0;
}

(function scanRecursive (curs) {
(function scanRecursive(curs) {
self.client.scan(curs, 'match', key, function (err, res) {
if (err) {
return callback(err);
}

function next () {
function next() {
if (res[0] === '0') {
callback(null);
} else {
Expand Down Expand Up @@ -532,7 +532,7 @@ _.extend(Redis.prototype, {
);
},

addSnapshot: function(snap, callback) {
addSnapshot: function (snap, callback) {
if (!snap.aggregateId) {
var errMsg = 'aggregateId not defined!';
debug(errMsg);
Expand Down Expand Up @@ -580,37 +580,34 @@ _.extend(Redis.prototype, {
return s;
}).reverse();

if (revMax > -1) {
allKeys = allKeys.slice(0, revMax);
if (revMax === -1) { // by default the last snapshot is kept
allKeys = allKeys.slice(0, 1);
}

if (allKeys.length === 0) {
return callback(null, null);
}

async.map(allKeys, function (key, callback) {
// iterating recursively over snapshots, from latest to oldest
(function nextSnapshot(key) {
self.client.get(key, function (err, res) {
if (err) {
debug(err);
return callback(err);
}

callback(null, jsondate.parse(res));
});
}, function (err, res) {
if (err) {
debug(err);
return callback(err);
}

var found = _.find(res, function (s) {
if (revMax > -1 && s.revision > revMax) {
return false;
var snapshot = jsondate.parse(res);
if (revMax > -1 && snapshot.revision > revMax) {
if (allKeys.length > 0) {
nextSnapshot(allKeys.shift());
} else {
callback(null, null);
}
} else {
callback(null, snapshot);
}
return true;
});

callback(null, found);
});
})(allKeys.shift());
}
);
}
Expand Down