Skip to content
Browse files

Merge remote-tracking branch 'upstream/master'

Conflicts:
	lib/client.js
	test/client.test.js
  • Loading branch information...
2 parents a36e077 + dc6eb83 commit 34f15b7bfc11c64cfe3d236cb7a54dde88ac2c69 Yunong Xiao committed Nov 14, 2012
Showing with 104 additions and 114 deletions.
  1. +90 −79 lib/client.js
  2. +2 −2 lib/error.js
  3. +5 −5 package.json
  4. +7 −28 test/client.test.js
View
169 lib/client.js
@@ -16,6 +16,13 @@ var ZKError = require('./error').ZKError;
var sprintf = util.format;
+var PROXY_EVENTS = [
+ 'connect',
+ 'not_connected',
+ 'close',
+ 'session_expired'
+];
+
///--- Helpers
@@ -51,6 +58,48 @@ function translateEvent(event) {
}
+function onZookeeperError(zh, p, code) {
+ var log = this.log;
+ log.trace({
+ zh: zh,
+ path: p,
+ code: code
+ }, 'zk error event');
+ switch (code) {
+
+ case ZK.ZSESSIONEXPIRED:
+ this.emit('error', new ZKError(code, 'ZK: session expired'));
+ break;
+
+ case ZK.ZINVALIDSTATE:
+ this.emit('error',
+ new ZKError(code, 'ZK: client in an invalid state'));
+ break;
+
+ case ZK.ZBADARGUMENTS:
+ this.emit('error',
+ new ZKError(code, 'ZK: invalid arguments'));
+ break;
+
+ case ZK.ZOPERATIONTIMEOUT:
+ log.trace('operation timed out, not emitting error');
+ break;
+
+ case ZK.ZSYSTEMERROR:
+ case ZK.ZRUNTIMEINCONSISTENCY:
+ case ZK.ZDATAINCONSISTENCY:
+ case ZK.ZMARSHALLINGERROR:
+ case ZK.ZUNIMPLEMENTED:
+ default:
+ log.fatal({
+ code: code
+ }, 'emitting unexpected err');
+ this.emit('error', new ZKError(code, 'ZK: unexpected error'));
+ break;
+ }
+}
+
+
///--- API
@@ -59,7 +108,6 @@ function ZKClient(options) {
assert.object(options.log, 'options.log');
assert.arrayOfObject(options.servers, 'options.servers');
assert.optionalNumber(options.timeout, 'options.timeout');
- assert.optionalNumber(options.connectTimeout, 'options.connectTimeout');
assert.optionalString(options.clientId, 'options.clientId');
assert.optionalString(options.clientPassword, 'options.clientPassword');
@@ -73,7 +121,6 @@ function ZKClient(options) {
this.clientPassword = options.clientPassword;
this.port = options.port;
this.servers = [];
- this.connectTimeout = options.connectTimeout;
this.timeout = options.timeout;
this.watchers = [];
@@ -99,103 +146,67 @@ function ZKClient(options) {
this.zk = new ZK(this.zookeeperOptions);
- this.zk.on('error', this.onError.bind(this));
- this.zk.on('session_expired', function () {
- self.log.error('session expired');
- self.zk.removeAllListeners();
- self.emit('session_expired', new ZKError(ZK.ZSESSIONEXPIRED));
- self.emit('error', new ZKError(ZK.ZSESSIONEXPIRED));
- self.close();
- });
- this.zk.once('close', this.onClose.bind(this));
- this.zk.on('not_connected', function () {
- self.emit('not_connected');
- });
- this.zk.on('connect', function () {
- self.emit('connect');
- });
-}
-util.inherits(ZKClient, EventEmitter);
-
-ZKClient.prototype.onError = function onError(zh, _path, code) {
- var self = this;
- var log = self.log;
- log.warn({
- zh: zh,
- path: _path,
- code: code
- }, 'got error');
- switch (code) {
- case ZK.ZCONNECTIONLOSS:
- case ZK.ZSYSTEMERROR:
- case ZK.ZRUNTIMEINCONSISTENCY:
- case ZK.ZDATAINCONSISTENCY:
- case ZK.ZMARSHALLINGERROR:
- case ZK.ZUNIMPLEMENTED:
- case ZK.ZBADARGUMENTS:
- case ZK.ZINVALIDSTATE:
- case ZK.ZSESSIONEXPIRED:
- self.emit('error', new ZKError(code));
- break;
- case ZK.ZOPERATIONTIMEOUT:
- log.warn('operation timed out, not emitting error');
- break;
- default:
- log.warn('got unknown error', code);
- break;
- }
-};
-ZKClient.prototype.onClose = function onClose() {
- var log = this.log;
- log.debug('zookeeper session closed by caller');
- this.zk.removeAllListeners();
- //remove all the watchers.
- this.watchers.forEach(function (w) {
- w.stop();
+ PROXY_EVENTS.forEach(function (ev) {
+ var proxy = self.emit.bind(self, ev);
+ self.zk.on(ev, function proxyEvent() {
+ self.log.trace('event: %s', ev);
+ proxy.apply(self, arguments);
+ });
});
- this.emit('close');
-};
+ this.zk.on('error', onZookeeperError.bind(this));
+}
+util.inherits(ZKClient, EventEmitter);
ZKClient.prototype.connect = function connect() {
var log = this.log;
var self = this;
var zk = this.zk;
- var connectTimeout = self.connectTimeout;
-
- log.trace('connecting');
- var timeoutId;
- if (connectTimeout) {
- timeoutId = setTimeout(function onConnectTimeout() {
- if (self.getState() !== 'connected') {
- self.removeAllListeners('connect');
- self.emit('error',
- new ZKError(ZK.ZCONNECTIONLOSS,
- 'connect timeout (' +
- connectTimeout +
- 'ms)'));
- self.close();
- }
- });
- }
+
+ log.trace('connect: entered');
+
zk.connect(function connectCallback() {
log.trace({
clientId: zk.client_id,
clientPassword: zk.client_password
- }, 'connected');
- clearTimeout(timeoutId);
+ }, 'connect: connected');
self.clientId = zk.client_id;
self.clientPassword = zk.client_password;
- // no need to emit connect here, as we node-zookeeper will emit
+ // no need to emit connect here, as node-zookeeper will emit
// a connect event, which gets re-emitted up at the constructor.
});
};
ZKClient.prototype.close = function close() {
- this.log.trace('closing');
- this.closedByClient = true;
+ var self = this;
+
+ this.log.trace('close: entered');
+
+ PROXY_EVENTS.forEach(function (ev) {
+ self.zk.removeAllListeners(ev);
+ });
+
+ this.removeAllListeners('connect');
+ this.removeAllListeners('connection_interrupted');
+ this.removeAllListeners('session_expired');
+
+ this.zk.once('close', function () {
+ self.watchers.forEach(function (w) {
+ w.stop();
+ });
+
+ self.zk.removeAllListeners('error');
+ self.removeAllListeners('error');
+
+ self.log.trace('close: done');
+ self.emit('close');
+ process.nextTick(function () {
+ self.removeAllListeners('close');
+ });
+ });
+
this.zk.close();
};
View
4 lib/error.js
@@ -10,9 +10,9 @@ function ZKError(code, message, constructorOpt) {
if (Error.captureStackTrace)
Error.captureStackTrace(this, constructorOpt || ZKError);
- this.name = 'ZooKeeperError';
- this.message = message || '';
this.code = code;
+ this.message = message || '';
+ this.name = 'ZooKeeperError';
}
util.inherits(ZKError, Error);
View
10 package.json
@@ -16,11 +16,11 @@
],
"main": "./lib/index.js",
"dependencies": {
- "assert-plus": "0.1.0",
- "bunyan": "0.13.5",
- "node-uuid": "1.3.3",
- "vasync": "1.3.1",
- "zookeeper": "git://github.com/yunong/node-zookeeper.git"
+ "assert-plus": "0.1.2",
+ "bunyan": "0.16.5",
+ "node-uuid": "1.4.0",
+ "vasync": "1.3.2",
+ "zookeeper": "git://github.com/yunong/node-zookeeper.git#71107cc"
},
"devDependencies": {
"cover": "0.2.8",
View
35 test/client.test.js
@@ -65,7 +65,7 @@ after(function (callback) {
console.error('Unable to clean up %s', ROOT);
process.exit(1);
}
- ZK.on('close', callback);
+ ZK.once('close', callback);
ZK.close();
});
});
@@ -256,6 +256,7 @@ test('watch (data+initialRead)', function (t) {
});
});
+
test('trigger close', function (t) {
var ZK2 = zk.createClient({
connectTimeout: false,
@@ -271,9 +272,10 @@ test('trigger close', function (t) {
t.end();
});
ZK2.connect();
- ZK2.zk.close();
+ ZK2.close();
});
+
test('connect to expired session', function (t) {
var ZK2 = zk.createClient({
connectTimeout: false,
@@ -287,18 +289,19 @@ test('connect to expired session', function (t) {
clientPassword: '9A9F0236749B498451DB8AD918491CAD'
});
ZK2.on('error', function (err) {
- t.equal(err.code, -112);
+ t.equal(err.code, zk.ZSESSIONEXPIRED);
+ ZK2.close();
t.end();
});
ZK2.connect();
});
+
test('connect to non-existent zk', function (t) {
var ZK2 = zk.createClient({
log: helper.createLogger('zk.client.test.js'),
servers: [ {
host: 'localhost',
- // note port = 9999
port: 9999
}],
timeout: 5000
@@ -315,27 +318,3 @@ test('connect to non-existent zk', function (t) {
ZK2.connect();
});
-test('connect to artificial connection timeout', function (t) {
- var ZK2 = zk.createClient({
- log: helper.createLogger('zk.client.test.js'),
- servers: [ {
- host: 'localhost',
- // note port = 9999
- port: 9999
- }],
- timeout: 5000,
- connectTimeout: 1000
- });
- var time = Date.now();
- ZK2.on('error', function (err) {
- t.equal(err.code, -4);
- var interval = Date.now() - time;
- if (interval < 2000) {
- t.end();
- } else {
- t.ok(false, 'connect did not timeout in time');
- t.end();
- }
- });
- ZK2.connect();
-});

0 comments on commit 34f15b7

Please sign in to comment.
Something went wrong with that request. Please try again.