Skip to content

Commit

Permalink
Added a bunch of scaffolding for the cluster implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Jul 26, 2016
1 parent 95412af commit e36cdd7
Show file tree
Hide file tree
Showing 13 changed files with 569 additions and 23 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -28,3 +28,8 @@ nanomsg is a socket library that provides several common communication patterns.
# API

A detailed documentation will come, until then check out the [examples](test/examples.js) in the test directory.


# Future Development

* A monitor tool that shows what services/nodes are available on the cluster and general cluster information
1 change: 1 addition & 0 deletions index.js
Expand Up @@ -135,6 +135,7 @@ class Netz {
shutdown() {
// TODO clean up existing services and unregister them from the cluster before closing down.
this._discovery.stop();
this._context._cluster.stop();
}
}

Expand Down
68 changes: 66 additions & 2 deletions lib/Node.js
@@ -1,15 +1,26 @@
var Service = require('./Service');

class Node {
/**
* @param {Context} context
* @param {String} host
* @param {Object.<String, Service>} services
* @param {Object.<String, Service>} [services={}]
* @param {Number} [connections=0]
*/
constructor(context, host, services = {}) {
constructor(context, host, services = {}, connections = 0) {
this._host = host;
this._services = services;
this._connections = connections;
context.on('serviceAdded', this._onServiceAdded.bind(this));
context.on('serviceRemoved', this._onServiceRemoved.bind(this));
}

/**
* Returns a unique id, which is equivalent to the host the node runs on.
* @returns {String}
*/
get id() {
return this._host;
}

/**
Expand All @@ -28,6 +39,59 @@ class Node {
return this._services;
}

/**
* Returns the number of connections to other nodes this node has.
* @returns {Number}
*/
get connections() {
return this._connections;
}

/**
* Set the number of connections this node is connected to.
* @param {Number} val
*/
set connections(val) {
this._connections = val;
}

/**
* Sets the address on which the bus for this node is listening.
* @param {String} address
*/
set bus(address) {
this._bus = address;
}

/**
* Returns the address of the bus on which this node can be reached
* @returns {String}
*/
get bus() {
return this._bus;
}

/**
* Returns the node as a json document.
*/
get json() {
return {
host: this._host,
bus: this._bus,
cons: this._connections,
svc: this._services.map(service => service.json)
}
}

/**
* Returns a node instance from a given json document.
* @param json
*/
static parse(context, json) {
let services = json.svc.map(service => Service.parse(context, service));
return new Node(context, json.host, services, json.connections);
}

/**
* Returns the service with the given name, or undefined if it wasn't found.
* @param {String} name
Expand Down
55 changes: 55 additions & 0 deletions lib/NodeManager.js
@@ -1,12 +1,19 @@
var _ = require('lodash');

var Node = require('./Node');


/**
* The node manager is a central location to look up nodes.
*/
class NodeManager {
/**
* @param {Context} context
*/
constructor(context) {
this._self = new Node(context, context.listen.substr(0, context.listen.lastIndexOf(':')));
this._nodes = {};
// TODO react to nodeAdded/Removed/Changed and update the information we have.
}

/**
Expand All @@ -15,6 +22,54 @@ class NodeManager {
get self() {
return this._self;
}

/**
* Adds one or more nodes to the list of known nodes.
* @param {Node|Node[]} nodes
*/
add(nodes) {
nodes = _.isArray(nodes) ? nodes : [nodes];
for (let node of nodes) {
this._nodes[node.id] = node;
}
}

/**
* Returns the node with the given id.
* @param {String} id
* @returns {Node}
*/
get(id) {
return this._nodes[id];
}

/**
* Removes a node from the list of known nodes.
* @param {String|Node} node
*/
remove(node) {
delete this._nodes[node instanceof Node ? node.id : node];
}

/**
* Returns a list with all known nodes.
* @returns {Node[]}
*/
get list() {
let list = [];
for (let prop in this._nodes) {
list.push(this._nodes[prop]);
}
return list;
}

/**
* Returns a map with all known nodes mapped by id.
* @returns {Object.<String, Node>}
*/
get map() {
return this._nodes;
}
}

module.exports = NodeManager;
99 changes: 82 additions & 17 deletions lib/Options.js
Expand Up @@ -4,6 +4,7 @@ var _ = require('lodash');
var EventEmitter = require('eventemitter2');
var shortId = require('shortid');

var Cluster = require('./cluster');
var NodeManager = require('./NodeManager');
var ServiceManager = require('./ServiceManager');

Expand Down Expand Up @@ -114,6 +115,7 @@ class Context extends Validator {
this.__proto__ = new EventEmitter({ newListener: false, maxListeners: 100 });
this._services = new ServiceManager(this);
this._nodes = new NodeManager(this);
this._cluster = new Cluster(this);
}

/**
Expand All @@ -125,9 +127,7 @@ class Context extends Validator {
discovery: {
type: 'broadcast'
},
cluster: {
name: 'default'
}
debug: () => {}
}
}

Expand All @@ -149,6 +149,16 @@ class Context extends Validator {
return new Ports(val);
}

/**
* Makes sure that the cluster has a name.
* @param {Object} val
* @param {String} val.name
* @returns {Object}
*/
cluster(val) {
return new ClusterOpts(val);
}

/**
* Makes sure the connection string for broadcasts is something zeromq can understand.
*/
Expand All @@ -162,19 +172,6 @@ class Context extends Validator {
return val;
}

/**
* Makes sure that the cluster has a name.
* @param {Object} val
* @param {String} val.name
* @returns {Object}
*/
cluster(val) {
if (!val || !val.name) {
throw new Error('Cluster needs to have a name')
}
return val;
}

/**
* @param {Object} val
* @returns {DiscoveryOpts}
Expand Down Expand Up @@ -211,7 +208,75 @@ class Context extends Validator {
if (val) {
return console.log;
}
return null;
return () => {};
}
}


/**
* Verifies all options for the cluster.
*/
class ClusterOpts extends Validator {
get default() {
return {
name: 'default',
peers: 3,
history: {
ttl: 300000,
size: 1000
}
}
}

/**
* Each cluster needs a name.
* @param {String} val
* @returns {String}
*/
name(val) {
if (!val || !val.trim().length) {
throw new Error('Cluster needs to have a name')
}
return val.trim();
}

/**
* Checks that the number of peers each node connects to is at least 1.
* @param {String|Number} val
* @returns {Number}
*/
peers(val) {
if (!val) {
throw new Error('Cluster needs a peer limit');
}
val = parseInt(val);
if (val < 1) {
throw new Error('Cluster peer limit needs to be set at 1 or higher');
}
return val;
}

/**
* Checks that the history settings make sense.
* @param val
* @returns {*}
*/
history(val) {
if (!val.ttl) {
throw new Error('Cluster history needs a ttl to work with');
}
if (!val.size) {
throw new Error('Cluster history needs a maximum size to work with');
}
val.ttl = parseInt(val.ttl);
val.size = parseInt(val.size);
if (val.ttl < 1) {
throw new Error('The history ttl needs to be a positive number');
}
if (val.size < 1) {
throw new Error('The history size needs to be at least 1');
}
return val;
}
}

Expand Down
15 changes: 15 additions & 0 deletions lib/Service.js
Expand Up @@ -76,6 +76,21 @@ class Service extends EventEmitter {
return this.node.host + ':' + this.port;
}

/**
* Returns the service as a json document.
*/
get json() {
// TODO return each service as something that can be parsed on the other side
}

/**
* Returns a service object parsed from a json document
* @param json
*/
static parse(context, json) {
// TODO parse whatever came from the other side
}

/**
* Returns true if the other service can interact with this service. At service level this check verifies that
* both services use the same service name. Inheriting classes should check for further properties.
Expand Down

0 comments on commit e36cdd7

Please sign in to comment.