From e36cdd76bc1bc5d3dd9e667388a3286828c677a0 Mon Sep 17 00:00:00 2001 From: Ravi Gairola Date: Mon, 25 Jul 2016 19:35:20 -0700 Subject: [PATCH] Added a bunch of scaffolding for the cluster implementation --- README.md | 5 + index.js | 1 + lib/Node.js | 68 ++++++++++++- lib/NodeManager.js | 55 ++++++++++ lib/Options.js | 99 ++++++++++++++---- lib/Service.js | 15 +++ lib/cluster/history.js | 82 +++++++++++++++ lib/cluster/index.js | 178 +++++++++++++++++++++++++++++++++ lib/cluster/message.js | 18 ++++ test/Options.test.js | 11 +- test/cluster/history.test.js | 56 +++++++++++ test/discovery/local.test.js | 1 + test/discovery/unicast.test.js | 3 +- 13 files changed, 569 insertions(+), 23 deletions(-) create mode 100644 lib/cluster/history.js create mode 100644 lib/cluster/index.js create mode 100644 lib/cluster/message.js create mode 100644 test/cluster/history.test.js diff --git a/README.md b/README.md index 7b9c740..153db21 100644 --- a/README.md +++ b/README.md @@ -28,3 +28,8 @@ nanomsg is a socket library that provides several common communication patterns. # API A detailed documentation will come, until then check out the [examples](test/examples.js) in the test directory. + + +# Future Development + +* A monitor tool that shows what services/nodes are available on the cluster and general cluster information diff --git a/index.js b/index.js index 9b3a53b..f2e6a9b 100644 --- a/index.js +++ b/index.js @@ -135,6 +135,7 @@ class Netz { shutdown() { // TODO clean up existing services and unregister them from the cluster before closing down. this._discovery.stop(); + this._context._cluster.stop(); } } diff --git a/lib/Node.js b/lib/Node.js index b64f1d0..c066106 100644 --- a/lib/Node.js +++ b/lib/Node.js @@ -1,15 +1,26 @@ +var Service = require('./Service'); + class Node { /** * @param {Context} context * @param {String} host - * @param {Object.} services + * @param {Object.} [services={}] + * @param {Number} [connections=0] */ - constructor(context, host, services = {}) { + constructor(context, host, services = {}, connections = 0) { this._host = host; this._services = services; + this._connections = connections; context.on('serviceAdded', this._onServiceAdded.bind(this)); context.on('serviceRemoved', this._onServiceRemoved.bind(this)); + } + /** + * Returns a unique id, which is equivalent to the host the node runs on. + * @returns {String} + */ + get id() { + return this._host; } /** @@ -28,6 +39,59 @@ class Node { return this._services; } + /** + * Returns the number of connections to other nodes this node has. + * @returns {Number} + */ + get connections() { + return this._connections; + } + + /** + * Set the number of connections this node is connected to. + * @param {Number} val + */ + set connections(val) { + this._connections = val; + } + + /** + * Sets the address on which the bus for this node is listening. + * @param {String} address + */ + set bus(address) { + this._bus = address; + } + + /** + * Returns the address of the bus on which this node can be reached + * @returns {String} + */ + get bus() { + return this._bus; + } + + /** + * Returns the node as a json document. + */ + get json() { + return { + host: this._host, + bus: this._bus, + cons: this._connections, + svc: this._services.map(service => service.json) + } + } + + /** + * Returns a node instance from a given json document. + * @param json + */ + static parse(context, json) { + let services = json.svc.map(service => Service.parse(context, service)); + return new Node(context, json.host, services, json.connections); + } + /** * Returns the service with the given name, or undefined if it wasn't found. * @param {String} name diff --git a/lib/NodeManager.js b/lib/NodeManager.js index 64201b0..cf9206b 100644 --- a/lib/NodeManager.js +++ b/lib/NodeManager.js @@ -1,12 +1,19 @@ +var _ = require('lodash'); + var Node = require('./Node'); +/** + * The node manager is a central location to look up nodes. + */ class NodeManager { /** * @param {Context} context */ constructor(context) { this._self = new Node(context, context.listen.substr(0, context.listen.lastIndexOf(':'))); + this._nodes = {}; + // TODO react to nodeAdded/Removed/Changed and update the information we have. } /** @@ -15,6 +22,54 @@ class NodeManager { get self() { return this._self; } + + /** + * Adds one or more nodes to the list of known nodes. + * @param {Node|Node[]} nodes + */ + add(nodes) { + nodes = _.isArray(nodes) ? nodes : [nodes]; + for (let node of nodes) { + this._nodes[node.id] = node; + } + } + + /** + * Returns the node with the given id. + * @param {String} id + * @returns {Node} + */ + get(id) { + return this._nodes[id]; + } + + /** + * Removes a node from the list of known nodes. + * @param {String|Node} node + */ + remove(node) { + delete this._nodes[node instanceof Node ? node.id : node]; + } + + /** + * Returns a list with all known nodes. + * @returns {Node[]} + */ + get list() { + let list = []; + for (let prop in this._nodes) { + list.push(this._nodes[prop]); + } + return list; + } + + /** + * Returns a map with all known nodes mapped by id. + * @returns {Object.} + */ + get map() { + return this._nodes; + } } module.exports = NodeManager; diff --git a/lib/Options.js b/lib/Options.js index 61d022c..1336054 100644 --- a/lib/Options.js +++ b/lib/Options.js @@ -4,6 +4,7 @@ var _ = require('lodash'); var EventEmitter = require('eventemitter2'); var shortId = require('shortid'); +var Cluster = require('./cluster'); var NodeManager = require('./NodeManager'); var ServiceManager = require('./ServiceManager'); @@ -114,6 +115,7 @@ class Context extends Validator { this.__proto__ = new EventEmitter({ newListener: false, maxListeners: 100 }); this._services = new ServiceManager(this); this._nodes = new NodeManager(this); + this._cluster = new Cluster(this); } /** @@ -125,9 +127,7 @@ class Context extends Validator { discovery: { type: 'broadcast' }, - cluster: { - name: 'default' - } + debug: () => {} } } @@ -149,6 +149,16 @@ class Context extends Validator { return new Ports(val); } + /** + * Makes sure that the cluster has a name. + * @param {Object} val + * @param {String} val.name + * @returns {Object} + */ + cluster(val) { + return new ClusterOpts(val); + } + /** * Makes sure the connection string for broadcasts is something zeromq can understand. */ @@ -162,19 +172,6 @@ class Context extends Validator { return val; } - /** - * Makes sure that the cluster has a name. - * @param {Object} val - * @param {String} val.name - * @returns {Object} - */ - cluster(val) { - if (!val || !val.name) { - throw new Error('Cluster needs to have a name') - } - return val; - } - /** * @param {Object} val * @returns {DiscoveryOpts} @@ -211,7 +208,75 @@ class Context extends Validator { if (val) { return console.log; } - return null; + return () => {}; + } +} + + +/** + * Verifies all options for the cluster. + */ +class ClusterOpts extends Validator { + get default() { + return { + name: 'default', + peers: 3, + history: { + ttl: 300000, + size: 1000 + } + } + } + + /** + * Each cluster needs a name. + * @param {String} val + * @returns {String} + */ + name(val) { + if (!val || !val.trim().length) { + throw new Error('Cluster needs to have a name') + } + return val.trim(); + } + + /** + * Checks that the number of peers each node connects to is at least 1. + * @param {String|Number} val + * @returns {Number} + */ + peers(val) { + if (!val) { + throw new Error('Cluster needs a peer limit'); + } + val = parseInt(val); + if (val < 1) { + throw new Error('Cluster peer limit needs to be set at 1 or higher'); + } + return val; + } + + /** + * Checks that the history settings make sense. + * @param val + * @returns {*} + */ + history(val) { + if (!val.ttl) { + throw new Error('Cluster history needs a ttl to work with'); + } + if (!val.size) { + throw new Error('Cluster history needs a maximum size to work with'); + } + val.ttl = parseInt(val.ttl); + val.size = parseInt(val.size); + if (val.ttl < 1) { + throw new Error('The history ttl needs to be a positive number'); + } + if (val.size < 1) { + throw new Error('The history size needs to be at least 1'); + } + return val; } } diff --git a/lib/Service.js b/lib/Service.js index c9f9ba8..8adfec8 100644 --- a/lib/Service.js +++ b/lib/Service.js @@ -76,6 +76,21 @@ class Service extends EventEmitter { return this.node.host + ':' + this.port; } + /** + * Returns the service as a json document. + */ + get json() { + // TODO return each service as something that can be parsed on the other side + } + + /** + * Returns a service object parsed from a json document + * @param json + */ + static parse(context, json) { + // TODO parse whatever came from the other side + } + /** * Returns true if the other service can interact with this service. At service level this check verifies that * both services use the same service name. Inheriting classes should check for further properties. diff --git a/lib/cluster/history.js b/lib/cluster/history.js new file mode 100644 index 0000000..e85125e --- /dev/null +++ b/lib/cluster/history.js @@ -0,0 +1,82 @@ +var Message = require('./message'); + +/** + * A history id entry that will delete itself after a given time. + */ +class IdEntry { + /** + * + * @param {IdHistory} history A reference to the history object that holds all entries + * @param {String} id A unique id that is used to find this entry later again + * @param {Number} ttl The time how long this entry should exist in ms. + */ + constructor(history, id, ttl) { + this._id = id; + this._timeout = setTimeout(() => history.remove(id), ttl); + } + + /** + * Return this id of this entry. + * @returns {String} + */ + get id() { + return this._id; + } + + /** + * Clears the timeout that would clean this object by itself. + */ + stop() { + clearTimeout(this._timeout); + } +} + +/** + * A helper collection used to store message ids that will clean itself over time. + */ +class IdHistory { + /** + * @param {Context} context + */ + constructor(context) { + this._context = context; + this._map = {}; + this._queue = []; + } + + /** + * Add an id to the history. It will be removed after the configured ttl automatically. + * @param {Message|String} id + */ + add(id) { + id = id instanceof Message ? id.id : id; + let entry = new IdEntry(this, id, this._context.cluster.history.ttl); + this._map[id] = entry; + this._queue.push(entry); + if (this._queue.length > this._context.cluster.history.size) { + let purged = this._queue.shift(); + delete this._map[purged.id]; + } + } + + /** + * Removes the given id from history. + * @param id + */ + remove(id) { + this._map[id] && this._map[id].stop(); + delete this._map[id]; + this._queue.filter(elem => elem.id !== id); + } + + /** + * Check if an id is recorded in the history. + * @param {String} id The id to check for + * @returns {boolean} Return true if it exists, otherwise false. + */ + getById(id) { + return !!this._map[id]; + } +} + +module.exports = IdHistory; diff --git a/lib/cluster/index.js b/lib/cluster/index.js new file mode 100644 index 0000000..75a49a8 --- /dev/null +++ b/lib/cluster/index.js @@ -0,0 +1,178 @@ +var nanomsg = require('nanomsg'); +var shortId = require('shortid'); + +var History = require('./history'); + + +class Cluster { + constructor(context) { + this._context = context; + this._history = new History(context); + this._pair = nanomsg.socket('pair'); + this._pair.on('message', this._onConnecting.bind(this)); + this._bus = nanomsg.socket('bus'); + this._bus.on('message', this._onMessage.bind(this)); + context.on('discovered', this._onDiscovered.bind(this)); + context.on('nodeAdded', this._onNodeAdded.bind(this)); + context.on('nodeRemoved', this._onNodeRemoved.bind(this)); + this._bind(this._pair); + } + + /** + * Will bind a given socket to an available port as close as possible to the listen configuration. + * @param socket + * @private + */ + _bind(socket) { + let address = this._context.listen; + let i = 0; + while (i++ < 1000) { + try { + socket.bind(address); + return this._context.debug('Cluster is now listening for incoming nodes at', address); + } catch (e) { + if (e.message == 'Address already in use') { + address = address.replace(/(\d+)$/, (match, number) => parseInt(number) + 1) + } else { + throw e; + } + } + } + throw new Error('Cluster is unable to listen for incoming messages'); + } + + /** + * Event handler that react to a node being discovered. Once a node has been discovered we send it a list of known + * nodes on the network. + * @param {String[]} nodes A list of node addresses that this node can connect to using nanomsg. + * @private + */ + _onDiscovered(nodes) { + for (let node of nodes) { + try { + this._pair.connect(node); + this._pair.send(new Buffer(JSON.stringify(this._context._nodes.list), 'utf8')); + this._pair.close(); + return; + } catch(e) { + this._context.debug('Unable to connect to node', node); + } + } + this._context.debug('A node was discovered but we were unable to connect to it.'); + } + + /** + * Event handler for messages on the pairing socket. The buffer contains a list of nodes sent by _onDiscovered(). + * Once the discovery is done this node will close the pair socket and instead open a bus socket to communicate with + * multiple nodes in the cluster. + * @param {Buffer} buffer + * @private + */ + _onConnecting(buffer) { + let list = JSON.parse(buffer.toString('utf8')); + this._context._nodes.add(list); + this._pair.close(); + list.sort((a,b) => a.connections < b.connections); + this._bind(this._pair); + for (let i = 0; i < this._context.cluster.peers; i++) { + this._bind.connect(list[i].bus); + // TODO other nodes need to know that they are connected to this node now, so do a broadcast increasing their connection number or find a way to count bus connections + this._context._nodes.self.connections++; + } + this.send('nodeAdded', this._context._nodes.self); + } + + /** + * Sends a message to all nodes in the cluster + * @param {String} event The event that should be fired on the other nodes + * @param {*} payload The payload that should be passed on + */ + send(event, ...payload) { + let message = { + id: shortId.generate(), + event, + origin: this._context._nodes.self.id + }; + switch(event) { + case 'serviceAdded': + case 'serviceRemoved': + case 'nodeAdded': + case 'nodeChanged': + case 'nodeRemoved': + message.payload = payload[0].json; + break; + default: throw new Error('Unknown event, will not send to rest of cluster'); + } + this._bind.send(new Buffer(message, 'utf8')); + } + + /** + * TODO Node communication + * On send + * - generate message id + * - send to all connected nodes + * + * On receive + * - check history for message + * - if no history of message + * => send message to all other peers + * - clean history of id after N seconds + */ + /** + * The general message handler that receives messages from an established cluster + * @param {Buffer} buffer + * @private + */ + _onMessage(buffer) { + let message = JSON.parse(buffer.toString('utf8')); + switch (message.event) { + case 'serviceAdded': + case 'serviceRemoved': + case 'nodeAdded': + case 'nodeChanged': + case 'nodeRemoved': + break; + default: throw new Error('Unknown event, will not send to rest of cluster') + } + this._context.emit(message.event, message.payload); + // TODO check history and send to other nodes. + } + + /** + * Node added handler that will connect to the new node if we don't have as many peer connections as desired yet. + * @param {Node} node + * @private + */ + _onNodeAdded(node) { + if (this._context._nodes.self.connections < this._context.cluster.peers) { + this._bind.connect(node.bus); + this._context._nodes.self.connections++; + } + } + + /** + * If the removed node is one that we are connected to then we need to check if we need to connect to more nodes + * @param {Node} node + * @private + */ + _onNodeRemoved(node) { + if (this._context._nodes.self.connections < this._context.cluster.peers) { + + } + } + + /** + * TODO Node disconnects (might be same node removed) + * react to node has disconnected involuntarily + * -> Send other nodeRemoved info to rest of cluster + * -> if number of connections is < N + * => Connect to existing node with least connections + * -> Send own nodeChanged info to rest of cluster + */ + + stop() { + this._bus && this._bus.close(); + } +} + +module.exports = Cluster; diff --git a/lib/cluster/message.js b/lib/cluster/message.js new file mode 100644 index 0000000..281ad40 --- /dev/null +++ b/lib/cluster/message.js @@ -0,0 +1,18 @@ +var shortId = require('shortid'); + +class Message { + constructor(payload, id = shortId.generate()) { + this._payload = payload; + this._id = id; + } + + get id() { + return this._id; + } + + get payload() { + return this._payload; + } +} + +module.exports = Message; diff --git a/test/Options.test.js b/test/Options.test.js index b30a809..89cf13b 100644 --- a/test/Options.test.js +++ b/test/Options.test.js @@ -10,6 +10,7 @@ describe('Options', () => { min: 30000, max: '31000' }, + debug: () => {}, someComplexOption: { thatIsNotValidated: true } @@ -24,6 +25,7 @@ describe('Options', () => { expect(options.ports.other).to.equal(29000); // Allow complex objects to be passed in even without validation expect(options.someComplexOption.thatIsNotValidated).to.equal(true); + options._cluster.stop(); }); it('should validate that port ranges are valid', () => { @@ -31,19 +33,22 @@ describe('Options', () => { ports: { min: 10, max: 9 - } + }, + debug: () => {} })).to.throw(Error); }); it('should validate that defaults are being set properly', () => { - var options = new Options({}); + var options = new Options({debug: () => {}}); expect(options.ports.min).to.equal(40000); expect(options.ports.max).to.equal(50000); + options._cluster.stop(); }); it('should be an event emitter', done => { - var options = new Options(); + var options = new Options({debug: () => {}}); options.once('test', () => { + options._cluster.stop(); done(); }); options.emit('test'); diff --git a/test/cluster/history.test.js b/test/cluster/history.test.js new file mode 100644 index 0000000..88eae5e --- /dev/null +++ b/test/cluster/history.test.js @@ -0,0 +1,56 @@ +/* global describe, it, beforeEach, afterEach */ +let expect = require('chai').expect; + +let History = require('../../lib/cluster/history'); + + +describe('History', () => { + it('should create entries that are accessible immediately', () => { + let history = new History({ + cluster: { + history: { + ttl: 1000, + size: 10 + } + } + }); + + history.add('testid'); + expect(history.getById('testid')).to.be.true; + }); + + it('should purge entries that are older than the given ttl', done => { + let history = new History({ + cluster: { + history: { + ttl: 10, + size: 10 + } + } + }); + + history.add('testid'); + setTimeout(() => { + expect(history.getById('testid')).to.be.false; + done(); + }, 15); + }); + + it('should purge entries if the size limit has been reached', () => { + let history = new History({ + cluster: { + history: { + ttl: 1000, + size: 3 + } + } + }); + + history.add('testid1'); + history.add('testid2'); + history.add('testid3'); + history.add('testid4'); + expect(history.getById('testid1')).to.be.false; + expect(history.getById('testid2')).to.be.true; + }); +}); diff --git a/test/discovery/local.test.js b/test/discovery/local.test.js index edb2139..ef197a8 100644 --- a/test/discovery/local.test.js +++ b/test/discovery/local.test.js @@ -21,6 +21,7 @@ describe('discovery.local', () => { it('should emit the discovered event', done => { let context = new Options({ listen: 'tcp://127.0.0.1:12345', + debug: () => {}, discovery: { type: 'local', file: tmpFile, diff --git a/test/discovery/unicast.test.js b/test/discovery/unicast.test.js index 25e9117..72064d2 100644 --- a/test/discovery/unicast.test.js +++ b/test/discovery/unicast.test.js @@ -11,7 +11,8 @@ describe('discovery.unicast', () => { discovery: { type: 'unicast', hosts: ['tcp://1.0.0.0:1', 'tcp://1.0.0.0:2'] - } + }, + debug: () => {} }); context.once('discovered', nodes => {