Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge branch 'ioredis' of https://github.com/adpdigital/mosca into v2…
Browse files Browse the repository at this point in the history
…-dev
  • Loading branch information
mcollina committed May 2, 2016
2 parents ac276fb + 7ffb871 commit 9eaa907
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 49 deletions.
88 changes: 43 additions & 45 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ OTHER DEALINGS IN THE SOFTWARE.
"use strict";

var AbstractPersistence = require("./abstract");
var redis = require("redis");
var Redis = require("ioredis");
var util = require("util");
var Matcher = require("./matcher");
var async = require("async");
Expand Down Expand Up @@ -114,37 +114,15 @@ function RedisPersistence(options, callback) {
});
}

var redisError = null;
var redisVersions = that._client.server_info.versions;
if ( (redisVersions[0] * 1000 + redisVersions[1]) < 2006 ) {
redisError = 'redis instance version should be no less than 2.6';
}

if (cb) {
cb(redisError);
} else {
if (redisError) {
throw redisError;
}
cb();
}
});
};

var that = this;

this._pubSubClient.subscribe(this.options.channel);

this._pubSubClient.on("message", function(channel, message) {
if (that._explicitlyClosed()) {
return;
}
var parsed = JSON.parse(message);
if (parsed.process !== that._id) {
newSub(parsed.key, parsed.unsubs);
}
});

this._pubSubClient.on("subscribe", function() {
this._pubSubClient.subscribe(this.options.channel, function(){
if (that._explicitlyClosed()) {
return;
}
Expand All @@ -161,6 +139,16 @@ function RedisPersistence(options, callback) {
});
});
});

this._pubSubClient.on("message", function(channel, message) {
if (that._explicitlyClosed()) {
return;
}
var parsed = JSON.parse(message);
if (parsed.process !== that._id) {
newSub(parsed.key, parsed.unsubs);
}
});
}

util.inherits(RedisPersistence, AbstractPersistence);
Expand All @@ -172,21 +160,25 @@ util.inherits(RedisPersistence, AbstractPersistence);
*/

RedisPersistence.prototype._buildClient = function() {
var options = this.options;
var client = redis.createClient(
options.port || 6379,
options.host || "127.0.0.1",
options.redisOptions);

if (options.db) {
client.select(options.db);
var options = this.options.redisOptions || {};

if (this.options.host) {
options.host = this.options.host;
}

if (this.options.port) {
options.port = this.options.port;
}

if (this.options.db) {
options.db = this.options.db;
}

if (options.password) {
client.auth(options.password);
if (this.options.password) {
options.password = this.options.password;
}

return client;
return new Redis(options);
};

RedisPersistence.prototype.storeRetained = function(packet, cb) {
Expand Down Expand Up @@ -325,11 +317,10 @@ RedisPersistence.prototype.lookupSubscriptions = function(client, cb) {
var multi = this._client.multi();
var that = this;

multi.get(key, function(err, result) {
subscriptions = JSON.parse(result) || {};
});
multi.get(key);

multi.exec(function(err) {
multi.exec(function(err, result) {
subscriptions = JSON.parse(result[0][1]) || {};
cb(err, subscriptions);
});
}
Expand Down Expand Up @@ -393,7 +384,17 @@ RedisPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
}

function fetch(multi, key) {
multi.get(key, function(err, result) {
return multi.get(key);
}

results.reduce(fetch, that._client.multi()).exec(function(err,multiResults){
if(!multiResults && done) {
done(err);
return;
}
multiResults.forEach(function(multiResult, i){
var key = results[i];
var result = multiResult[1];
total --;
// If we don't get result for given packet key. It means
// that packet has expired. Just clean it from client packets key
Expand All @@ -406,10 +407,7 @@ RedisPersistence.prototype.streamOfflinePackets = function(client, cb, done) {
}
emit(key, result);
});
return multi;
}

results.reduce(fetch, that._client.multi()).exec();
});
});
};

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
"leveldown": "~1.4.3",
"zmq": "~2.14.0",
"amqp": "~0.2.4",
"redis": "~2.5.0",
"ioredis": "^1.15.1",
"hiredis": "^0.4.1",
"mongodb": "~2.1.4"
}
Expand Down
2 changes: 1 addition & 1 deletion test/persistence/redis_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var abstract = require("./abstract");
var Redis = require("../../").persistence.Redis;
var redis = require("redis");
var redis = require("ioredis");

describe("mosca.persistence.Redis", function() {

Expand Down
4 changes: 2 additions & 2 deletions test/server_redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ var mqtt = require("mqtt");
var async = require("async");
var ascoltatori = require("ascoltatori");
var abstractServerTests = require("./abstract_server");
var redis = require("redis");
var redis = require("ioredis");
var createConnection = require("./helpers/createConnection");

describe("mosca.Server with redis persistence", function() {
Expand Down Expand Up @@ -33,7 +33,7 @@ describe("mosca.Server with redis persistence", function() {
},
persistence : {
factory: mosca.persistence.Redis
},
}
};
}

Expand Down

0 comments on commit 9eaa907

Please sign in to comment.