From 5300a6f2306da15fbcf1a3e55439c4de5df5bcc7 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Thu, 9 Jul 2015 15:27:20 +0200 Subject: [PATCH] [major] Add ability to retrieve the sparks of a given server --- README.md | 54 +++++++++++++++++++++--------------- metroplex.js | 60 +++++++++++++++++++++------------------- omega.js | 8 +++--- test/integration.test.js | 43 ++++++++++++++++++++++++---- 4 files changed, 104 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index f70599f..124f8c0 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Metroplex a Redis based spark/connection registry for Primus. ## Installation -Metroplex is released in the npm registry and can therefor be installed using: +Metroplex is released in the npm registry and can therefore be installed using: ``` npm install --save metroplex @@ -43,15 +43,15 @@ this plugin: node process is still alive. The interval determines the interval of these updates. When the interval is reached we update the key in the database with the current EPOCH as well as start a scan for possible dead servers and - removing them. The default interval `300000` ms + removing them. The default interval `300000` ms. - *latency*: The maximum time it would take to update the `alive` key in Redis. This time is subtracted from the set `interval` so we update the key BEFORE it expires. Defaults to `2000` ms. - *address* The address or public URL on which this SPECIFIC server is reachable. Should be without path name. When nothing is supplied we try to be - somewhat smart and read the address and port and server type from the server - that Primus is attached to and compose an URL like: `http://0.0.0.0:8080` from - it. + somewhat smart and read the address, the port and the type of the server from + the server that Primus is attached to and compose an URL like: + `http://0.0.0.0:8080` from it. These options should be provided in the options object of the Primus server: @@ -82,41 +82,49 @@ The following **public** methods are available. #### metroplex.servers ```js -metroplex.servers(fn) +metroplex.servers(self, sparks, fn) ``` -List all the servers in our current registry. +This method returns all the servers in the registry or the servers for the +given spark ids. It takes the following arguments: + +##### self + +An optional boolean flag to specify if the result should include the current +server or not. It defaults to `false` and has no effect if the `sparks` +argument is provided. ```js -metroplex.servers(function (err, servers) { +primus.metroplex(true, function (err, servers) { + // `servers` is an array with all the servers in the registry console.log(servers); }); ``` -#### metroplex.spark - -```js -metroplex.spark(id, fn) -``` +##### sparks -Get the server for the given spark id. It does not check if the spark is hosted -on the current server. That's up to the developer to implement. +A spark id or an array of spark ids. If this argument is provided the method +returns only the server for the given spark ids. We don't check if the spark +id/s is/are hosted on the current server. It's up to the developer to prevent +useless database calls. ```js -metroplex.spark(id, function (err, server) { - console.log(server); +metroplex.servers(['ad8a-280z-18', 'y97x-42480-13'], function (err, servers) { + // `servers` is an array with the servers of the two sparks. + console.log(servers); }); ``` -#### metroplex.sparks - ```js -metroplex.sparks(sparks, fn) +metroplex.servers('ad8a-280z-18', function (err, address) { + // `address` is the server of the given spark. + console.log(address); +}); ``` -Get the servers for each id in the given `sparks` array. It will return an -object and just like `metroplex.spark` it does not check if the spark is hosted -on the current server. +##### fn + +A callback function that follows the usual error first pattern. ### Omega Supreme integration diff --git a/metroplex.js b/metroplex.js index 2936d3e..345e9a7 100644 --- a/metroplex.js +++ b/metroplex.js @@ -186,56 +186,58 @@ Metroplex.readable('disconnect', function disconnect(spark) { }); /** - * Get all current registered servers except our selfs. + * Get all the servers in the registry or the servers for the given spark id(s). * - * @param {Function} fn Callback + * @param {Boolean} [self] Whether to include ourselves in the result. + * @param {String|Array} [sparks] A spark id or an array of spark ids. + * @param {Function} fn Callback. * @returns {Metroplex} * @api public */ -Metroplex.readable('servers', function servers(self, fn) { - var metroplex = this; +Metroplex.readable('servers', function servers(self, sparks, fn) { + var namespace = this.namespace + , redis = this.redis + , metroplex = this; if ('boolean' !== typeof self) { - fn = self; - self = 0; + fn = sparks; + sparks = self; + self = false; } - metroplex.redis.smembers(this.namespace +'servers', function smembers(err, members) { - if (self) return fn(err, members); + if ('function' === typeof sparks) { + fn = sparks; + sparks = null; + } - fn(err, (members || []).filter(function filter(address) { - return address !== metroplex.address; - })); - }); + if (!sparks) { + redis.smembers(namespace +'servers', function smembers(err, members) { + if (self) return fn(err, members); - return this; -}); + fn(err, (members || []).filter(function filter(address) { + return address !== metroplex.address; + })); + }); + } else if (Array.isArray(sparks)) { + redis.hmget.apply(redis, [namespace +'sparks'].concat(sparks).concat(fn)); + } else { + redis.hget(namespace +'spark', sparks, fn); + } -/** - * Get the server address for a given spark id. - * - * @param {String} id The spark id who's server address we want to retrieve. - * @param {Function} fn Callback - * @returns {Metroplex} - * @api public - */ -Metroplex.readable('spark', function spark(id, fn) { - this.redis.hget(this.namespace +'sparks', id, fn); return this; }); /** - * Get all server addresses for the given spark ids. + * Get all the spark ids for the given server address. * - * @param {Array} ids The spark id's we need to look up + * @param {String} address The server address. * @param {Function} fn Callback. * @returns {Metroplex} * @api public */ -Metroplex.readable('sparks', function sparks(ids, fn) { - var key = this.namespace +'sparks'; +Metroplex.readable('sparks', function sparks(address, fn) { + this.redis.smembers(this.namespace + this.address +':sparks', fn); - this.redis.hmget.apply(this.redis, [key].concat(ids).concat(fn)); return this; }); diff --git a/omega.js b/omega.js index 807535b..3102213 100644 --- a/omega.js +++ b/omega.js @@ -53,7 +53,7 @@ module.exports = function forwards(primus) { forward.sparks = function sparks(ids, msg, fn) { fn = fn || nope; - metroplex.sparks(ids, function sparks(err, servers) { + metroplex.servers(ids, function sparks(err, servers) { if (err) return fn(err); servers = Object.keys(servers).reduce(function fn(memo, spark) { @@ -81,9 +81,9 @@ module.exports = function forwards(primus) { }; /** - * Forward the message to a specific spark + * Forward the message to a specific spark. * - * @param {String} id Spark id + * @param {String} id Spark id. * @param {Mixed} msg Message to broadcast. * @param {Function} fn Completion callback. * @returns {Forward} @@ -92,7 +92,7 @@ module.exports = function forwards(primus) { forward.spark = function spark(id, msg, fn) { fn = fn || nope; - metroplex.spark(id, function spark(err, server) { + metroplex.servers(id, function spark(err, server) { if (err) return fn(err); forward(server, msg, id, fn); diff --git a/test/integration.test.js b/test/integration.test.js index 42a660c..1e1833c 100644 --- a/test/integration.test.js +++ b/test/integration.test.js @@ -157,11 +157,13 @@ describe('plugin', function () { http.listen(portnumber); }); - it('finds servers for a list of sparks', function (next) { + it('finds the servers for a list of sparks', function (next) { server.use('metroplex', metroplex); server2.use('metroplex', metroplex); - var clients = [] + var Socket2 = server2.Socket + , Socket = server.Socket + , clients = [] , length = 10; function push(address) { @@ -169,7 +171,7 @@ describe('plugin', function () { clients.push({ id: id, address: address }); if (clients.length === length) { - server.metroplex.sparks(clients.map(function (client) { + server.metroplex.servers(clients.map(function (client) { return client.id; }), function (err, addresses) { if (err) return next(err); @@ -186,11 +188,42 @@ describe('plugin', function () { server.once('register', function (address) { var len = length / 2; - while (len--) new server.Socket(address).id(push(address)); + while (len--) new Socket(address).id(push(address)); }); server2.once('register', function (address) { var len = length / 2; - while (len--) new server2.Socket(address).id(push(address)); + while (len--) new Socket2(address).id(push(address)); + }); + }); + + it('finds the sparks in a server', function (next) { + server.use('metroplex', metroplex); + + var Socket = server.Socket + , clients = [] + , length = 10; + + function push(address) { + return function (id) { + clients.push(id); + + if (clients.length === length) { + server.metroplex.sparks(address, function (err, sparks) { + if (err) return next(err); + + clients.forEach(function (id) { + assume(sparks.indexOf(id)).is.above(-1); + }); + + next(); + }); + } + }; + } + + server.once('register', function (address) { + var len = length; + while (len--) new Socket(address).id(push(address)); }); }); });