Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to retrieve the sparks of a given server #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 42 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Metroplex a Redis based spark/connection registry for Primus.

## Installation

Metroplex is released in the npm registry and can therefor be installed using:
Metroplex is released in the npm registry and can therefore be installed using:

```
npm install --save metroplex
Expand Down Expand Up @@ -43,15 +43,15 @@ this plugin:
node process is still alive. The interval determines the interval of these
updates. When the interval is reached we update the key in the database with
the current EPOCH as well as start a scan for possible dead servers and
removing them. The default interval `300000` ms
removing them. The default interval `300000` ms.
- *latency*: The maximum time it would take to update the `alive` key in Redis.
This time is subtracted from the set `interval` so we update the key BEFORE
it expires. Defaults to `2000` ms.
- *address* The address or public URL on which this SPECIFIC server is
reachable. Should be without path name. When nothing is supplied we try to be
somewhat smart and read the address and port and server type from the server
that Primus is attached to and compose an URL like: `http://0.0.0.0:8080` from
it.
somewhat smart and read the address, the port and the type of the server from
the server that Primus is attached to and compose an URL like:
`http://0.0.0.0:8080` from it.

These options should be provided in the options object of the Primus server:

Expand Down Expand Up @@ -82,41 +82,64 @@ The following **public** methods are available.
#### metroplex.servers

```js
metroplex.servers(fn)
metroplex.servers(self, sparks, fn)
```

List all the servers in our current registry.
This method returns all the servers in the registry or the servers for the
given spark ids. It takes the following arguments:

##### self

An optional boolean flag to specify if the result should include the current
server or not. It defaults to `false` and has no effect if the `sparks`
argument is provided.

```js
metroplex.servers(function (err, servers) {
metroplex.servers(true, function (err, servers) {
// `servers` is an array with all the servers in the registry.
console.log(servers);
});
```

#### metroplex.spark
##### sparks

A spark id or an array of spark ids. If this argument is provided the method
returns only the server for the given spark ids. We don't check if the spark
id/s is/are hosted on the current server. It's up to the developer to prevent
useless database calls.

```js
metroplex.spark(id, fn)
metroplex.servers(['ad8a-280z-18', 'y97x-42480-13'], function (err, servers) {
// `servers` is an array with the servers of the two sparks.
console.log(servers);
});
```

Get the server for the given spark id. It does not check if the spark is hosted
on the current server. That's up to the developer to implement.

```js
metroplex.spark(id, function (err, server) {
console.log(server);
metroplex.servers('ad8a-280z-18', function (err, address) {
// `address` is the server of the given spark.
console.log(address);
});
```

##### fn

A callback function that follows the usual error first pattern.

#### metroplex.sparks

```js
metroplex.sparks(sparks, fn)
metroplex.sparks(address, fn)
```

Get the servers for each id in the given `sparks` array. It will return an
object and just like `metroplex.spark` it does not check if the spark is hosted
on the current server.
This method returns all the spark ids for the given server address.

```js
metroplex.sparks('http://192.168.0.10:3000', function (err, ids) {
// `ids` is an array of spark ids.
console.log(ids);
});
```

### Omega Supreme integration

Expand Down
60 changes: 31 additions & 29 deletions metroplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -186,56 +186,58 @@ Metroplex.readable('disconnect', function disconnect(spark) {
});

/**
* Get all current registered servers except our selfs.
* Get all the servers in the registry or the servers for the given spark id(s).
*
* @param {Function} fn Callback
* @param {Boolean} [self] Whether to include ourselves in the result.
* @param {String|Array<String>} [sparks] A spark id or an array of spark ids.
* @param {Function} fn Callback.
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('servers', function servers(self, fn) {
var metroplex = this;
Metroplex.readable('servers', function servers(self, sparks, fn) {
var namespace = this.namespace
, redis = this.redis
, metroplex = this;

if ('boolean' !== typeof self) {
fn = self;
self = 0;
fn = sparks;
sparks = self;
self = false;
}

metroplex.redis.smembers(this.namespace +'servers', function smembers(err, members) {
if (self) return fn(err, members);
if ('function' === typeof sparks) {
fn = sparks;
sparks = null;
}

fn(err, (members || []).filter(function filter(address) {
return address !== metroplex.address;
}));
});
if (!sparks) {
redis.smembers(namespace +'servers', function smembers(err, members) {
if (self) return fn(err, members);

return this;
});
fn(err, (members || []).filter(function filter(address) {
return address !== metroplex.address;
}));
});
} else if (Array.isArray(sparks)) {
redis.hmget.apply(redis, [namespace +'sparks'].concat(sparks).concat(fn));
} else {
redis.hget(namespace +'spark', sparks, fn);
}

/**
* Get the server address for a given spark id.
*
* @param {String} id The spark id who's server address we want to retrieve.
* @param {Function} fn Callback
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('spark', function spark(id, fn) {
this.redis.hget(this.namespace +'sparks', id, fn);
return this;
});

/**
* Get all server addresses for the given spark ids.
* Get all the spark ids for the given server address.
*
* @param {Array} ids The spark id's we need to look up
* @param {String} address The server address.
* @param {Function} fn Callback.
* @returns {Metroplex}
* @api public
*/
Metroplex.readable('sparks', function sparks(ids, fn) {
var key = this.namespace +'sparks';
Metroplex.readable('sparks', function sparks(address, fn) {
this.redis.smembers(this.namespace + this.address +':sparks', fn);

this.redis.hmget.apply(this.redis, [key].concat(ids).concat(fn));
return this;
});

Expand Down
8 changes: 4 additions & 4 deletions omega.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ module.exports = function forwards(primus) {
forward.sparks = function sparks(ids, msg, fn) {
fn = fn || nope;

metroplex.sparks(ids, function sparks(err, servers) {
metroplex.servers(ids, function sparks(err, servers) {
if (err) return fn(err);

servers = Object.keys(servers).reduce(function fn(memo, spark) {
Expand Down Expand Up @@ -81,9 +81,9 @@ module.exports = function forwards(primus) {
};

/**
* Forward the message to a specific spark
* Forward the message to a specific spark.
*
* @param {String} id Spark id
* @param {String} id Spark id.
* @param {Mixed} msg Message to broadcast.
* @param {Function} fn Completion callback.
* @returns {Forward}
Expand All @@ -92,7 +92,7 @@ module.exports = function forwards(primus) {
forward.spark = function spark(id, msg, fn) {
fn = fn || nope;

metroplex.spark(id, function spark(err, server) {
metroplex.servers(id, function spark(err, server) {
if (err) return fn(err);

forward(server, msg, id, fn);
Expand Down
43 changes: 38 additions & 5 deletions test/integration.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,19 +157,21 @@ describe('plugin', function () {
http.listen(portnumber);
});

it('finds servers for a list of sparks', function (next) {
it('finds the servers for a list of sparks', function (next) {
server.use('metroplex', metroplex);
server2.use('metroplex', metroplex);

var clients = []
var Socket2 = server2.Socket
, Socket = server.Socket
, clients = []
, length = 10;

function push(address) {
return function (id) {
clients.push({ id: id, address: address });

if (clients.length === length) {
server.metroplex.sparks(clients.map(function (client) {
server.metroplex.servers(clients.map(function (client) {
return client.id;
}), function (err, addresses) {
if (err) return next(err);
Expand All @@ -186,11 +188,42 @@ describe('plugin', function () {

server.once('register', function (address) {
var len = length / 2;
while (len--) new server.Socket(address).id(push(address));
while (len--) new Socket(address).id(push(address));
});
server2.once('register', function (address) {
var len = length / 2;
while (len--) new server2.Socket(address).id(push(address));
while (len--) new Socket2(address).id(push(address));
});
});

it('finds the sparks in a server', function (next) {
server.use('metroplex', metroplex);

var Socket = server.Socket
, clients = []
, length = 10;

function push(address) {
return function (id) {
clients.push(id);

if (clients.length === length) {
server.metroplex.sparks(address, function (err, sparks) {
if (err) return next(err);

clients.forEach(function (id) {
assume(sparks.indexOf(id)).is.above(-1);
});

next();
});
}
};
}

server.once('register', function (address) {
var len = length;
while (len--) new Socket(address).id(push(address));
});
});
});