Skip to content

Commit

Permalink
Merge pull request #7 from primus/ioredis
Browse files Browse the repository at this point in the history
Ditch leverage and redis in favor of ioredis
  • Loading branch information
3rd-Eden committed Jul 7, 2015
2 parents 256f874 + bdc6618 commit 25ce3ec
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 29 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ this plugin:
- *redis*: Metroplex is currently using Redis as its default back-end for storing
the state of the connections. If you do not supply us with a pre-defined Redis
client (or authorized) we will create a Redis client which only connects to
localhost and Redis's default port number.
localhost and Redis's default port number. When provided this must be an
[`ioredis`](https://github.com/luin/ioredis) client.
- *namespace*: As the databases are usually shared with other programs it's good
to prefix all the data that you store, in Metroplex we prefix every key with
the set namespace. The default namespace is `metroplex`.
Expand Down
42 changes: 21 additions & 21 deletions metroplex.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
'use strict';

var Leverage = require('leverage')
, https = require('https')
var https = require('https')
, fuse = require('fusing')
, ip = require('ip');

//
// Ensure that the directory for our custom lua scripts is set correctly.
//
Leverage.scripts = Leverage.scripts.concat(
Leverage.introduce(require('path').join(__dirname, 'redis'), Leverage.prototype)
);
, path = require('path')
, ip = require('ip')
, fs = require('fs');

/**
* Add defaults to the supplied options. The following options are available:
Expand All @@ -32,17 +26,20 @@ function Metroplex(primus, options) {
options = options || {};
primus = primus || {};

var parsed = this.parse(primus.server);
var lua = fs.readFileSync(path.join(__dirname, 'redis/annihilate.lua'), 'utf8')
, parsed = this.parse(primus.server);

this.fuse();

this.redis = options.redis || require('redis').createClient();
this.redis = options.redis || new require('ioredis')();
this.namespace = (options.namespace || 'metroplex') +':';
this.interval = options.interval || 5 * 60 * 1000;
this.timeout = options.timeout || 30 * 60;
this.latency = options.latency || 2000;
this.leverage = new Leverage(this.redis, {
namespace: this.namespace

this.redis.defineCommand('annihilate', {
lua: lua.replace('{leverage::namespace}', this.namespace),
numberOfKeys: 1
});

if (parsed || options.address) {
Expand Down Expand Up @@ -90,21 +87,22 @@ Metroplex.readable('parse', function parse(server) {
* @api public
*/
Metroplex.readable('register', function register(address, fn) {
var metroplex = this;
var redis = this.redis
, metroplex = this;

metroplex.address = this.parse(address || metroplex.address);
metroplex.address = this.parse(address);
if (!metroplex.address) {
if (fn) fn();
return this;
}

metroplex.leverage.annihilate(metroplex.address, function annihilate(err) {
redis.annihilate(metroplex.address, function annihilate(err) {
if (err) {
if (fn) return fn(err);
return metroplex.emit('error', err);
}

metroplex.redis.multi()
redis.multi()
.psetex(metroplex.namespace + metroplex.address, metroplex.interval, Date.now())
.sadd(metroplex.namespace +'servers', metroplex.address)
.exec(function register(err) {
Expand All @@ -119,6 +117,8 @@ Metroplex.readable('register', function register(address, fn) {
if (fn) fn(err, metroplex.address);
});
});

return this;
});

/**
Expand All @@ -138,7 +138,7 @@ Metroplex.readable('unregister', function unregister(address, fn) {
return this;
}

metroplex.leverage.annihilate(address, function annihilate(err) {
metroplex.redis.annihilate(address, function annihilate(err) {
if (err) {
if (fn) return fn(err);
return metroplex.emit('error', err);
Expand Down Expand Up @@ -200,7 +200,7 @@ Metroplex.readable('servers', function servers(self, fn) {
self = 0;
}

this.redis.smembers(this.namespace +'servers', function smembers(err, members) {
metroplex.redis.smembers(this.namespace +'servers', function smembers(err, members) {
if (self) return fn(err, members);

fn(err, (members || []).filter(function filter(address) {
Expand Down Expand Up @@ -265,7 +265,7 @@ Metroplex.readable('setInterval', function setIntervals() {
redis.get(metroplex.namespace + address, function get(err, stamp) {
if (err || Date.now() - +stamp < metroplex.interval) return;

metroplex.leverage.annihilate(address, function murdered(err) {
redis.annihilate(address, function murdered(err) {
if (err) return metroplex.emit('error', err);
});
});
Expand Down
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@
"mocha": "2.2.x",
"omega-supreme": "0.0.x",
"pre-commit": "1.0.x",
"primus": "3.0.x",
"primus": "3.1.x",
"istanbul": "0.3.x",
"ws": "0.7.x"
},
"dependencies": {
"async": "1.0.x",
"async": "1.3.x",
"eventemitter3": "1.1.x",
"fusing": "1.0.x",
"ip": "0.3.x",
"leverage": "0.1.x",
"redis": "0.12.x"
"ioredis": "1.5.x"
}
}
4 changes: 2 additions & 2 deletions redis/annihilate.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ local address = assert(KEYS[1], 'The server address is missing')

--
-- Get all the sparks for our given address so we can nuke them from our "global"
-- spark registry>
-- spark registry.
--
local sparks = redis.call('SMEMBERS', namespace .. address ..':sparks')

Expand All @@ -20,7 +20,7 @@ end

--
-- Delete all left over references to this server address which are:
--
--
-- 1. Our dedicated sparks set
-- 2. Our server in the servers list
-- 3. The keep alive server update
Expand Down
2 changes: 1 addition & 1 deletion test/integration.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
describe('plugin', function () {
'use strict';

var redis = require('redis').createClient()
var redis = new require('ioredis')()
, assume = require('assume')
, Primus = require('primus')
, metroplex = require('../');
Expand Down

0 comments on commit 25ce3ec

Please sign in to comment.