Skip to content
Permalink
Browse files

backout MANTA-3223: needs more work

  • Loading branch information
IanWyszynski committed Jan 10, 2018
1 parent 9cc905f commit f0ea512aa6160616d3cce9e5a68c7acde2ca8fab
Showing with 405 additions and 431 deletions.
  1. +2 −2 Makefile
  2. +11 −11 README.md
  3. +2 −2 etc/config.coal.json
  4. +130 −144 lib/index.js
  5. +75 −140 lib/register.js
  6. +135 −0 lib/zk.js
  7. +49 −131 main.js
  8. +1 −1 package.json
@@ -5,7 +5,7 @@
#

#
# Copyright (c) 2018, Joyent, Inc.
# Copyright (c) 2017, Joyent, Inc.
#

#
@@ -36,7 +36,7 @@ SMF_MANIFESTS_IN = smf/manifests/registrar.xml.in
# Variables
#
NODE_PREBUILT_TAG = zone
NODE_PREBUILT_VERSION := v0.12.9
NODE_PREBUILT_VERSION := v0.10.26
NODE_PREBUILT_IMAGE = fd2cc906-8938-11e3-beab-4359c665ac99

# RELENG-341: no npm cache is making builds unreliable
@@ -188,9 +188,9 @@ not recommended.
**zookeeper:** Service discovery records are maintained in a
ZooKeeper cluster. The `"zookeeper"` top-level property describes how to reach
that cluster. This should be a configuration block appropriate for
[node-zkstream](http://github.com/joyent/node-zkstream). See that project for
details, but there's an example below that includes `"sessionTimeout"` and
`"servers"` properties.
[node-zkplus](http://github.com/mcavage/node-zkplus). See that project for
details, but there's an example below that includes `"timeout"` and `"servers"`
properties.

**registration:** The `"registration"` object describes the service discovery
records that will be inserted into ZooKeeper. These control the DNS names that
@@ -320,10 +320,10 @@ record. This is not common.
},
"adminIp": "172.27.10.72",
"zookeeper": {
"sessionTimeout": 60000,
"servers": [ { "address": "172.27.10.35", "port": 2181 },
{ "address": "172.27.10.32", "port": 2181 },
{ "address": "172.27.10.33", "port": 2181 } ]
"timeout": 60000,
"servers": [ { "host": "172.27.10.35", "port": 2181 },
{ "host": "172.27.10.32", "port": 2181 },
{ "host": "172.27.10.33", "port": 2181 } ]
}
}

@@ -392,10 +392,10 @@ Let's augment the configuration above to specify a service record:
},
"adminIp": "172.27.10.72",
"zookeeper": {
"sessionTimeout": 60000,
"servers": [ { "address": "172.27.10.35", "port": 2181 },
{ "address": "172.27.10.32", "port": 2181 },
{ "address": "172.27.10.33", "port": 2181 } ]
"timeout": 60000,
"servers": [ { "host": "172.27.10.35", "port": 2181 },
{ "host": "172.27.10.32", "port": 2181 },
{ "host": "172.27.10.33", "port": 2181 } ]
}
}
@@ -9,10 +9,10 @@
"zookeeper": {
"connectTimeout": 1000,
"servers": [ {
"address": "10.99.99.11",
"host": "10.99.99.11",
"port": 2181
} ],
"sessionTimeout": 30000
"timeout": 6000
},
"maxAttempts": 10
}
@@ -5,16 +5,16 @@
*/

/*
* Copyright (c) 2018, Joyent, Inc.
* Copyright (c) 2014, Joyent, Inc.
*/

var EventEmitter = require('events').EventEmitter;

var assert = require('assert-plus');
var zkstream = require('zkstream');

var health = require('./health');
var register = require('./register');
var zk = require('./zk');



@@ -30,171 +30,157 @@ function _export(obj) {

///--- API

function createZKClient(opts, cb) {
assert.object(opts, 'options');
assert.object(opts.log, 'options.log');
assert.arrayOfObject(opts.servers, 'options.servers');
assert.func(cb, 'callback');

assert.ok((opts.servers.length > 0), 'options.servers empty');

opts.servers.forEach(function (s) {
assert.string(s.address, 'servers.address');
assert.number(s.port, 'servers.port');
});

cb(new zkstream.Client(opts));
}


function Registrar(opts) {
function register_plus(opts) {
assert.object(opts, 'options');
assert.object(opts.log, 'options.log');
assert.object(opts.zk, 'options.zk');

this.options = opts;
this.zk = opts.zk;
this.ephemerals = {};
this.check = null;
this.sessionGeneration = 0;
}

Registrar.prototype.registerOnNewSession = function (callback) {
var self = this;
var opts = self.options;
var log = self.log;

function registerCallback(err, znodes) {
if (!err) {
self.sessionGeneration++;
}
callback(err);
}

/*
* Final argument indicate that we are registering nodes as part of a new
* session, and not as part of recovery from health check failure.
*/
register.register(opts, this, registerCallback, true);
}

Registrar.prototype.getSessionGeneration = function () {
return (this.sessionGeneration);
};

Registrar.prototype.hasHealthCheck = function () {
return (this.check !== null);
}

Registrar.prototype.createHealthCheck = function () {
var opts = this.options;
var check;
var ee = new EventEmitter();
var log = opts.log.child({component: 'registrar'}, true);
var stop = false;
var zk_timer;
var zk = opts.zk;
var znodes;

var self = this;

function healthcheck() {
check = health.createHealthCheck(opts.healthCheck);
self.check = check;
var down = false;

check.on('data', function (obj) {
switch (obj.type) {
case 'ok':
if (down) {
ee.emit('ok');

register.register(opts, self, function (r_err, __znodes) {
if (r_err) {
log.debug(r_err, 'register: reregister failed');
ee.emit('error', r_err);
} else {
znodes = __znodes;
ee.emit('register', __znodes);
setImmediate(function () {
down = false;
});
}
});
}
break;

case 'fail':
if (obj.err && obj.isDown && !down) {
down = true;
var e = obj.err;
delete obj.err;
log.debug(e, {
check: obj,
znodes: znodes
}, 'healthcheck failed, unregistering')

ee.emit('fail', e);

var u_opts = {
log: log,
zk: zk,
};
register.unregister(u_opts, self, function (u_err) {
if (u_err) {
log.debug(u_err, 'healthcheck: unregister failed');
ee.emit('error', u_err);
} else {
ee.emit('unregister', e, znodes);
}
});
}
break;

default:
log.debug({
check: obj
}, 'healtcheck: unknown type encountered');
ee.emit('error',
new Error('unknown check type: ' + obj.type));
break;
}
});

check.on('error', function (err) {
log.debug(err, 'healtcheck: unexpected error');
register.register(opts, function (err, _znodes) {
if (err) {
log.debug(err, 'registration(%j): failed', cfg.registration);
ee.emit('error', err);
});
return;
}

check.on('end', function () {
log.debug('healthcheck: done');
});
znodes = _znodes;

function healthcheck() {
check = health.createHealthCheck(opts.healthCheck);
var down = false;

check.on('data', function (obj) {
switch (obj.type) {
case 'ok':
if (down) {
ee.emit('ok');

register.register(opts, function (r_err, __znodes) {
if (r_err) {
log.debug(r_err, 'register: reregister failed');
ee.emit('error', r_err);
} else {
znodes = __znodes;
ee.emit('register', __znodes);
setImmediate(function () {
down = false;
});
}
});
}
break;

case 'fail':
if (obj.err && obj.isDown && !down) {
down = true;
var e = obj.err;
delete obj.err;
log.debug(e, {
check: obj,
znodes: znodes
}, 'healthcheck failed, deregistering')

ee.emit('fail', e);

var u_opts = {
log: log,
zk: zk,
znodes: znodes
};
register.unregister(u_opts, function (u_err) {
if (u_err) {
log.debug(u_err, 'healthcheck: unregister failed');
ee.emit('error', u_err);
} else {
ee.emit('unregister', e, znodes);
}
});
}
break;

default:
log.debug({
check: obj
}, 'healtcheck: unknown type encountered');
ee.emit('error',
new Error('unknown check type: ' + obj.type));
break;
}
});

if (!stop)
check.start();
}
check.on('error', function (err) {
log.debug(err, 'healtcheck: unexpected error');
ee.emit('error', err);
});

if (opts.healthCheck)
healthcheck();
check.on('end', function () {
log.debug('healthcheck: done');
});

ee.stop = function () {
stop = true;
if (!stop)
check.start();
}

if (check)
check.stop();
self.check = null;
};
(function zkHeartbeat() {
var heartbeatInterval = opts.heartbeatInterval || 3000;
var hCfg = opts.heartbeat || {};

(function checkZK() {
log.debug('zk.heartbeat(%j): starting', znodes);
zk.heartbeat({nodes: znodes}, function (check_err) {
var _data;
var _event;
var _to;

if (check_err) {
log.debug(check_err, 'zk.heartbeat(%j) failed', znodes);
_data = check_err;
_event = 'heartbeatFailure';
_to = Math.max(heartbeatInterval, 60000);
} else {
log.debug('zk.heartbeat(%j): ok', znodes);
_data = znodes;
_event = 'heartbeat';
_to = heartbeatInterval;
}

if (!stop)
zk_timer = setTimeout(checkZK, _to);
ee.emit(_event, _data);
});
})();
})();

if (opts.healthCheck)
healthcheck();

ee.stop = function () {
stop = true;

if (check)
check.stop();

clearTimeout(zk_timer);
};

ee.emit('register', znodes);
});

return (ee);
}


///--- Exports

module.exports = {
createRegistrar: function (opts) {
return (new Registrar(opts));
},
createZKClient: createZKClient
};
module.exports = register_plus;

_export(health);
_export(register);
_export(zk);

0 comments on commit f0ea512

Please sign in to comment.
You can’t perform that action at this time.