Skip to content

Commit

Permalink
[major] Add ability to retrieve the sparks of a given server
Browse files Browse the repository at this point in the history
  • Loading branch information
lpinca committed Jul 9, 2015
1 parent 25ce3ec commit 5300a6f
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 61 deletions.
54 changes: 31 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Metroplex a Redis based spark/connection registry for Primus.

## Installation

Metroplex is released in the npm registry and can therefor be installed using:
Metroplex is released in the npm registry and can therefore be installed using:

```
npm install --save metroplex
Expand Down Expand Up @@ -43,15 +43,15 @@ this plugin:
node process is still alive. The interval determines the interval of these
updates. When the interval is reached we update the key in the database with
the current EPOCH as well as start a scan for possible dead servers and
removing them. The default interval `300000` ms
removing them. The default interval `300000` ms.
- *latency*: The maximum time it would take to update the `alive` key in Redis.
This time is subtracted from the set `interval` so we update the key BEFORE
it expires. Defaults to `2000` ms.
- *address* The address or public URL on which this SPECIFIC server is
reachable. Should be without path name. When nothing is supplied we try to be
somewhat smart and read the address and port and server type from the server
that Primus is attached to and compose an URL like: `http://0.0.0.0:8080` from
it.
somewhat smart and read the address, the port and the type of the server from
the server that Primus is attached to and compose an URL like:
`http://0.0.0.0:8080` from it.

These options should be provided in the options object of the Primus server:

Expand Down Expand Up @@ -82,41 +82,49 @@ The following **public** methods are available.
#### metroplex.servers

```js
metroplex.servers(fn)
metroplex.servers(self, sparks, fn)
```

List all the servers in our current registry.
This method returns all the servers in the registry or the servers for the
given spark ids. It takes the following arguments:

##### self

An optional boolean flag to specify if the result should include the current
server or not. It defaults to `false` and has no effect if the `sparks`
argument is provided.

```js
metroplex.servers(function (err, servers) {
primus.metroplex(true, function (err, servers) {
// `servers` is an array with all the servers in the registry
console.log(servers);
});
```

#### metroplex.spark

```js
metroplex.spark(id, fn)
```
##### sparks

Get the server for the given spark id. It does not check if the spark is hosted
on the current server. That's up to the developer to implement.
A spark id or an array of spark ids. If this argument is provided the method
returns only the server for the given spark ids. We don't check if the spark
id/s is/are hosted on the current server. It's up to the developer to prevent
useless database calls.

```js
metroplex.spark(id, function (err, server) {
console.log(server);
metroplex.servers(['ad8a-280z-18', 'y97x-42480-13'], function (err, servers) {
// `servers` is an array with the servers of the two sparks.
console.log(servers);
});
```

#### metroplex.sparks

```js
metroplex.sparks(sparks, fn)
metroplex.servers('ad8a-280z-18', function (err, address) {
// `address` is the server of the given spark.
console.log(address);
});
```

Get the servers for each id in the given `sparks` array. It will return an
object and just like `metroplex.spark` it does not check if the spark is hosted
on the current server.
##### fn

A callback function that follows the usual error first pattern.

### Omega Supreme integration

Expand Down
60 changes: 31 additions & 29 deletions metroplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,56 +186,58 @@ Metroplex.readable('disconnect', function disconnect(spark) {
});

/**
* Get all current registered servers except our selfs.
* Get all the servers in the registry or the servers for the given spark id(s).
*
* @param {Function} fn Callback
* @param {Boolean} [self] Whether to include ourselves in the result.
* @param {String|Array<String>} [sparks] A spark id or an array of spark ids.
* @param {Function} fn Callback.
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('servers', function servers(self, fn) {
var metroplex = this;
Metroplex.readable('servers', function servers(self, sparks, fn) {
var namespace = this.namespace
, redis = this.redis
, metroplex = this;

if ('boolean' !== typeof self) {
fn = self;
self = 0;
fn = sparks;
sparks = self;
self = false;
}

metroplex.redis.smembers(this.namespace +'servers', function smembers(err, members) {
if (self) return fn(err, members);
if ('function' === typeof sparks) {
fn = sparks;
sparks = null;
}

fn(err, (members || []).filter(function filter(address) {
return address !== metroplex.address;
}));
});
if (!sparks) {
redis.smembers(namespace +'servers', function smembers(err, members) {
if (self) return fn(err, members);

return this;
});
fn(err, (members || []).filter(function filter(address) {
return address !== metroplex.address;
}));
});
} else if (Array.isArray(sparks)) {
redis.hmget.apply(redis, [namespace +'sparks'].concat(sparks).concat(fn));
} else {
redis.hget(namespace +'spark', sparks, fn);
}

/**
* Get the server address for a given spark id.
*
* @param {String} id The spark id who's server address we want to retrieve.
* @param {Function} fn Callback
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('spark', function spark(id, fn) {
this.redis.hget(this.namespace +'sparks', id, fn);
return this;
});

/**
* Get all server addresses for the given spark ids.
* Get all the spark ids for the given server address.
*
* @param {Array} ids The spark id's we need to look up
* @param {String} address The server address.
* @param {Function} fn Callback.
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('sparks', function sparks(ids, fn) {
var key = this.namespace +'sparks';
Metroplex.readable('sparks', function sparks(address, fn) {
this.redis.smembers(this.namespace + this.address +':sparks', fn);

this.redis.hmget.apply(this.redis, [key].concat(ids).concat(fn));
return this;
});

Expand Down
8 changes: 4 additions & 4 deletions omega.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ module.exports = function forwards(primus) {
forward.sparks = function sparks(ids, msg, fn) {
fn = fn || nope;

metroplex.sparks(ids, function sparks(err, servers) {
metroplex.servers(ids, function sparks(err, servers) {
if (err) return fn(err);

servers = Object.keys(servers).reduce(function fn(memo, spark) {
Expand Down Expand Up @@ -81,9 +81,9 @@ module.exports = function forwards(primus) {
};

/**
* Forward the message to a specific spark
* Forward the message to a specific spark.
*
* @param {String} id Spark id
* @param {String} id Spark id.
* @param {Mixed} msg Message to broadcast.
* @param {Function} fn Completion callback.
* @returns {Forward}
Expand All @@ -92,7 +92,7 @@ module.exports = function forwards(primus) {
forward.spark = function spark(id, msg, fn) {
fn = fn || nope;

metroplex.spark(id, function spark(err, server) {
metroplex.servers(id, function spark(err, server) {
if (err) return fn(err);

forward(server, msg, id, fn);
Expand Down
43 changes: 38 additions & 5 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,21 @@ describe('plugin', function () {
http.listen(portnumber);
});

it('finds servers for a list of sparks', function (next) {
it('finds the servers for a list of sparks', function (next) {
server.use('metroplex', metroplex);
server2.use('metroplex', metroplex);

var clients = []
var Socket2 = server2.Socket
, Socket = server.Socket
, clients = []
, length = 10;

function push(address) {
return function (id) {
clients.push({ id: id, address: address });

if (clients.length === length) {
server.metroplex.sparks(clients.map(function (client) {
server.metroplex.servers(clients.map(function (client) {
return client.id;
}), function (err, addresses) {
if (err) return next(err);
Expand All @@ -186,11 +188,42 @@ describe('plugin', function () {

server.once('register', function (address) {
var len = length / 2;
while (len--) new server.Socket(address).id(push(address));
while (len--) new Socket(address).id(push(address));
});
server2.once('register', function (address) {
var len = length / 2;
while (len--) new server2.Socket(address).id(push(address));
while (len--) new Socket2(address).id(push(address));
});
});

it('finds the sparks in a server', function (next) {
server.use('metroplex', metroplex);

var Socket = server.Socket
, clients = []
, length = 10;

function push(address) {
return function (id) {
clients.push(id);

if (clients.length === length) {
server.metroplex.sparks(address, function (err, sparks) {
if (err) return next(err);

clients.forEach(function (id) {
assume(sparks.indexOf(id)).is.above(-1);
});

next();
});
}
};
}

server.once('register', function (address) {
var len = length;
while (len--) new Socket(address).id(push(address));
});
});
});

0 comments on commit 5300a6f

Please sign in to comment.