From a947a95a1c13f6ac0a8ea3af591f6ee8b23b11e5 Mon Sep 17 00:00:00 2001 From: Dave Dopson Date: Mon, 12 Dec 2011 18:33:13 -0800 Subject: [PATCH] add setEncoding() to replace 'data_as_buffer'. expose the public properties. add 'connect' method that takes a callback. fix zk_promise. fix tests --- lib/test-promise.js | 6 +-- lib/zk_promise.js | 89 ++++++++++++++++++--------------- lib/zookeeper.js | 80 ++++++++++++++++++++++++++--- src/node-zk.cpp | 36 ++----------- test/foo.js | 55 ++++++++++++++++++++ test/zk_test_buffer.js | 32 +++++++----- test/zk_test_create.js | 2 +- test/zk_test_utf8.js | 2 +- test/zk_test_watcher_promise.js | 22 +++++--- 9 files changed, 223 insertions(+), 101 deletions(-) create mode 100755 test/foo.js diff --git a/lib/test-promise.js b/lib/test-promise.js index 737db3d2..00451d0f 100644 --- a/lib/test-promise.js +++ b/lib/test-promise.js @@ -1,12 +1,12 @@ -sys = require("sys"); +util = require("util"); var fs = require('./fs-promise'); // open a file and read it fs.open("fs-promise.js", process.O_RDONLY).then(function(fd){ return fs.read(fd, 4096); }).then(function(args){ - sys.puts(args[0]); + util.puts(args[0]); }); // does the same thing -fs.readFile("fs-promise.js").addCallback(sys.puts); \ No newline at end of file +fs.readFile("fs-promise.js").addCallback(util.puts); diff --git a/lib/zk_promise.js b/lib/zk_promise.js index aa45b7d9..dd87f982 100644 --- a/lib/zk_promise.js +++ b/lib/zk_promise.js @@ -1,50 +1,61 @@ var assert = require ('assert'); -var sys = require ('sys'); var promise = require("./promise"); -var ZK = require ("./zookeeper").ZooKeeper; -exports.ZK = ZK; +var ZooKeeper = require ("./zookeeper"); +var util = require('util'); -ZK.prototype.context = {}; +exports = module.exports = ZooKeeperPromise; +exports.ZK = ZooKeeperPromise; // backwards compatibility -ZK.prototype.on_connected = function () { - var deferred = promise.defer(); - this.on (ZK.on_connected, function (zkk) { - deferred.resolve (zkk); - }); - return deferred.promise; +function ZooKeeperPromise() { + var self = this; + + ZooKeeper.apply(this); + + return self; +} +util.inherits(ZooKeeperPromise, ZooKeeper); + +ZooKeeperPromise.prototype.on_connected = function on_connected() { + var self = this; + var deferred = promise.defer(); + self.on ('connect', function () { + deferred.resolve (self); + }); + return deferred.promise; }; -convertZKAsyncFunction = function(asyncFunction){ - return function(){ - var deferred = promise.defer(); - arguments.length ++; - arguments[arguments.length-1]= - function(rc, error, result){ - if(rc) { - deferred.emitError(rc); - } else { - if(arguments.length > 3){ - // if there are multiple success values, we return an array - Array.prototype.shift.call(arguments, 1); - Array.prototype.shift.call(arguments, 1); - deferred.emitSuccess(arguments); - } - else{ - deferred.emitSuccess(result); - } - } - }; - asyncFunction.apply (this, arguments); - return deferred.promise; + +function convertAsync(fn){ + return function() { + var self = this; + var deferred = promise.defer(); + arguments.length ++; + arguments[arguments.length-1] = function(rc, error, result){ + if(rc) { + deferred.emitError(rc); + } else { + if(arguments.length > 3){ + // if there are multiple success values, we return an array + Array.prototype.shift.call(arguments, 1); + Array.prototype.shift.call(arguments, 1); + deferred.emitSuccess(arguments); + } else { + deferred.emitSuccess(result); + } + } }; + fn.apply (self, arguments); + return deferred.promise; + }; }; -for (var f in ZK.prototype) { - var m = f.match(/^a(w?)_(.*)$/); - if (m) { - var new_func = m[1]? m[1] + "_" + m[2] : m[2]; - ZK.prototype[new_func] = convertZKAsyncFunction (ZK.prototype[f]); - //console.log ("function %s is %j", f, sys.inspect(ZK.prototype[new_func],true,3)); - } +for (var f in ZooKeeperPromise.prototype) { + var m = f.match(/^a(w?)_(.*)$/); + if (m) { + var new_func = m[1]? m[1] + "_" + m[2] : m[2]; + ZooKeeperPromise.prototype[new_func] = convertAsync (ZooKeeperPromise.prototype[f]); + //console.log ("function %s is %j", f, util.inspect(ZK.prototype[new_func],true,3)); + } } + diff --git a/lib/zookeeper.js b/lib/zookeeper.js index ae6bc07a..c5b4a634 100644 --- a/lib/zookeeper.js +++ b/lib/zookeeper.js @@ -22,16 +22,50 @@ function ZooKeeper() { self._native = new NativeZk(); self._native.emit = function(ev, a1, a2, a3) { if(self.logger) self.logger("Emitting '" + ev + "' with args: " + a1 + ", " + a2 + ", " + a3); - if(typeof a1 === 'ZooKeeper') { + if(ev === 'connect') { // the event is passing the native object. need to mangle this to return the wrapper instead - a1 = this; + a1 = self; } self.emit(ev, a1, a2, a3); } + + //////////////////////////////////////////////////////////////////////////////// + // Public Properties + //////////////////////////////////////////////////////////////////////////////// + + function proxyProperty(name) { + self.__defineGetter__(name, function(){ + return self._native[name]; + }); + self.__defineSetter__(name, function(val){ + self._native[name] = val; + }); + } + + proxyProperty('state'); + proxyProperty('timeout'); + proxyProperty('client_id'); + proxyProperty('is_unrecoverable'); + + self.encoding = null; + + self.setEncoding = function setEncoding(val) { + self.encoding = val; + }; + + self.__defineGetter__('data_as_buffer', function(){ + return self.encoding ? true : false; + }); + self.__defineSetter__('data_as_buffer', function(val){ + self.encoding = ((val == true) ? null : 'utf8'); + }); + } util.inherits(ZooKeeper, EventEmitter); + + //////////////////////////////////////////////////////////////////////////////// // Constants //////////////////////////////////////////////////////////////////////////////// @@ -124,9 +158,37 @@ ZooKeeper.prototype.setLogger = function(logger) { } } -ZooKeeper.prototype.init = function init() { +ZooKeeper.prototype.init = function init(options) { + var self = this; if(this.logger) this.logger("Calling init with " + util.inspect(arguments)); - return this._native.init.apply(this._native, arguments); + if(!_.isUndefined(options.data_as_buffer)) { + self.data_as_buffer = options.data_as_buffer; + } + return this._native.init.call(this._native, options); +} + +ZooKeeper.prototype.connect = function connect(options, cb) { + var self = this; + if(_.isFunction(options)) { + cb = options; + options = null; + } + self.init(options); + + function errorHandler(err) { + self.removeListener('error', errorHandler); + self.removeListener('connect', connectHandler); + cb(err); + } + + function connectHandler() { + self.removeListener('error', errorHandler); + self.removeListener('connect', connectHandler); + cb(null, self); + } + + self.on('error', errorHandler); + self.on('connect', connectHandler); } ZooKeeper.prototype.close = function close() { @@ -149,9 +211,15 @@ ZooKeeper.prototype.aw_exists = function aw_exists() { return this._native.aw_exists.apply(this._native, arguments); } -ZooKeeper.prototype.a_get = function a_get() { +ZooKeeper.prototype.a_get = function a_get(path, watch, data_cb) { + var self = this; if(this.logger) this.logger("Calling a_get with " + util.inspect(arguments)); - return this._native.a_get.apply(this._native, arguments); + return this._native.a_get.call(this._native, path, watch, function(rc, error, stat, data) { + if(self.encoding) { + data = data.toString(self.encoding); + } + data_cb(rc, error, stat, data); + }); } ZooKeeper.prototype.aw_get = function aw_get() { diff --git a/src/node-zk.cpp b/src/node-zk.cpp index cc5d0ca8..ec1709c0 100644 --- a/src/node-zk.cpp +++ b/src/node-zk.cpp @@ -25,8 +25,6 @@ namespace zk { return; \ } -static Persistent data_as_buffer; - #define DEFINE_STRING(ev,str) static Persistent ev = NODE_PSYMBOL(str) DEFINE_STRING (on_closed, "close"); DEFINE_STRING (on_connected, "connect"); @@ -157,7 +155,6 @@ class ZooKeeper: public ObjectWrap { constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("client_id"), ClientidPropertyGetter, 0, Local(), PROHIBITS_OVERWRITING, ReadOnly); constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("timeout"), SessionTimeoutPropertyGetter, 0, Local(), PROHIBITS_OVERWRITING, ReadOnly); constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("is_unrecoverable"), IsUnrecoverablePropertyGetter, 0, Local(), PROHIBITS_OVERWRITING, ReadOnly); - constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("data_as_buffer"), DataAsBufferPropertyGetter, DataAsBufferPropertySetter); target->Set(String::NewSymbol("ZooKeeper"), constructor_template->GetFunction()); } @@ -274,13 +271,8 @@ class ZooKeeper: public ObjectWrap { int32_t session_timeout = arg->Get(String::NewSymbol("timeout"))->ToInt32()->Value(); if (session_timeout == 0) session_timeout = 20000; - Local asBuffer = arg->Get(String::NewSymbol("data_as_buffer")); - ZooKeeper *zk = ObjectWrap::Unwrap(args.This()); assert(zk); - if( asBuffer != Undefined() && asBuffer != Null() ) { - zk->data_as_buffer = asBuffer->ToBoolean()->Value(); - } if (!zk->realInit(*_hostPort, session_timeout)) return ErrnoException(errno, "zookeeper_init", "failed to init", __FILE__); @@ -504,13 +496,9 @@ class ZooKeeper: public ObjectWrap { LOG_DEBUG(("rc=%d, rc_string=%s, value=%s", rc, zerror(rc), value)); argv[2] = stat != 0 ? zkk->createStatObject (stat) : Object::Cast(*Null()); if( value != 0 ) { - if( zkk->data_as_buffer) { - Buffer* b = Buffer::New(value_len); - memcpy(BufferData(b), value, value_len); - argv[3] = Local::New(b->handle_); - } else { - argv[3] = String::New(value, value_len); - } + Buffer* b = Buffer::New(value_len); + memcpy(BufferData(b), value, value_len); + argv[3] = Local::New(b->handle_); } else { argv[3] = String::Cast(*Null()); } @@ -653,21 +641,6 @@ class ZooKeeper: public ObjectWrap { return Integer::New (zk->zhandle != 0? is_unrecoverable (zk->zhandle) : 0); } - static Handle DataAsBufferPropertyGetter(Local property, const AccessorInfo &info) { - HandleScope scope; - ZooKeeper *zk = ObjectWrap::Unwrap(info.This()); - assert(zk); - return Boolean::New (zk->data_as_buffer); - } - - static void DataAsBufferPropertySetter(Local property, Local value, const AccessorInfo& info) { - HandleScope scope; - ZooKeeper *zk = ObjectWrap::Unwrap(info.This()); - assert(zk); - - zk->data_as_buffer = value->BooleanValue(); - } - void realClose () { if (is_closed) return; @@ -704,7 +677,7 @@ class ZooKeeper: public ObjectWrap { #define ZERO_MEM(member) bzero(&(member), sizeof(member)) - ZooKeeper () : zhandle(0), clientIdFile(0), fd(-1), data_as_buffer(true) { + ZooKeeper () : zhandle(0), clientIdFile(0), fd(-1) { ZERO_MEM (myid); ZERO_MEM (zk_io); ZERO_MEM (zk_timer); @@ -720,7 +693,6 @@ class ZooKeeper: public ObjectWrap { int interest; timeval tv; ev_tstamp last_activity; // time of last zookeeper event loop activity - bool data_as_buffer; bool is_closed; }; diff --git a/test/foo.js b/test/foo.js new file mode 100755 index 00000000..6444195c --- /dev/null +++ b/test/foo.js @@ -0,0 +1,55 @@ +#!/usr/bin/env node + +var util = require('util'); +var _ = require('underscore'); + +function MyFunctionalShape() { + if(! this instanceof MyFunctionalShape) { + return new MyFunctionalShape(); + } + var self = this; + console.log('parent ctor: this=' + this); +} + +MyFunctionalShape.prototype.baseMethod = function() { } + +function MyFunctionalCircle() { + if(!_.isObject(this) || !(this instanceof MyFunctionalCircle)) { + return new MyFunctionalCircle(arguments); + } + var self = this; + MyFunctionalShape.apply(self); + + self.fn = function() { } + console.log('child ctor: this=' + this); +} + +MyFunctionalCircle.super_ = MyFunctionalShape; +MyFunctionalCircle.prototype = Object.create(MyFunctionalShape.prototype, { + constructor: { + value: MyFunctionalCircle, + enumerable: false, + writable: true, + configurable: true + } +}); + +MyFunctionalCircle.prototype.method = function() { +console.log("a: " + util.inspect(this, true, 99)); + +}; + +MyFunctionalShape.prototype.baseMethod2 = function() {}; + +function lookAt(obj, name) { + console.log("Looking at " + name + ":"); + for(key in obj) { + console.log(name + "['" + key + "'] = " + obj[key]); + } +} + + +var a = new MyFunctionalCircle(); +lookAt(a, 'a'); +console.log("a: " + util.inspect(a, true, 99)); +a.method(); diff --git a/test/zk_test_buffer.js b/test/zk_test_buffer.js index 1cb34868..7d0fdeb1 100644 --- a/test/zk_test_buffer.js +++ b/test/zk_test_buffer.js @@ -7,23 +7,29 @@ var zk = new ZK(); var connect = (process.argv[2] || 'localhost:2181'); var err = false; -zk.init({connect:connect, timeout:5000, debug_level:ZK.ZOO_LOG_LEVEL_WARN, host_order_deterministic:false, data_as_buffer:false}); -zk.on('on_connected', function (zkk) { - console.log('zk session established, id=%s', zkk.client_id); +zk.connect({ + connect:connect, + timeout:5000, + debug_level:ZK.ZOO_LOG_LEVEL_WARN, + host_order_deterministic:false, + data_as_buffer:false +}, function (err) { + if(err) throw err; + console.log('zk session established, id=%s', zk.client_id); var b = new Buffer('\u00bd + \u00bc = \u00be'); var b2 = new Buffer('\u00bd + \u00bc = \u00be :: \u00bd + \u00bc = \u00be'); function phase2() { zk.data_as_buffer = true; - zkk.a_create('/node.js1', b, ZK.ZOO_SEQUENCE | ZK.ZOO_EPHEMERAL, function(rc, error, path) { - // console.log(util.inspect(zkk)); + zk.a_create('/node.js1', b, ZK.ZOO_SEQUENCE | ZK.ZOO_EPHEMERAL, function(rc, error, path) { + // console.log(util.inspect(zk)); if (rc != 0 && rc != -110) { console.log("zk node create result: %d, error: '%s', path=%s", rc, error, path); } else { // now get it - zkk.a_get(path, false, function(rc, error, stat, value) { // response - zkk.a_set(path, b2, 0, function(rc2, error2, stat2) { // response - zkk.a_get(path, false, function(rc3, error3, stat3, value3) { // response + zk.a_get(path, false, function(rc, error, stat, value) { // response + zk.a_set(path, b2, 0, function(rc2, error2, stat2) { // response + zk.a_get(path, false, function(rc3, error3, stat3, value3) { // response if( Buffer.isBuffer(value3) === false ) { console.log('ERROR (p2) value3 is not a Buffer, is: ' + (typeof(value3))); console.log(util.inspect(value3)); @@ -40,18 +46,18 @@ zk.on('on_connected', function (zkk) { }); } - zkk.a_create('/node.js1', b, ZK.ZOO_SEQUENCE | ZK.ZOO_EPHEMERAL, function(rc, error, path) { - console.log(util.inspect(zkk)); + zk.a_create('/node.js1', b, ZK.ZOO_SEQUENCE | ZK.ZOO_EPHEMERAL, function(rc, error, path) { + console.log(util.inspect(zk)); if (rc != 0 && rc != -110) { // -110 means "already created" console.log("zk node create result: %d, error: '%s', path=%s", rc, error, path); } else { console.log("now getting the thing"); // now get it - zkk.a_get(path, false, function(rc, error, stat, value) { // response + zk.a_get(path, false, function(rc, error, stat, value) { // response console.log("get returned, now setting the thing"); - zkk.a_set(path, b2, 0, function(rc2, error2, stat2) { // response + zk.a_set(path, b2, 0, function(rc2, error2, stat2) { // response console.log("set returned, now getting the thing"); - zkk.a_get(path, false, function(rc3, error3, stat3, value3) { // response + zk.a_get(path, false, function(rc3, error3, stat3, value3) { // response if( Buffer.isBuffer(value3) ) { console.log('ERROR (p1) value3 should be a Buffer, is: ' + (typeof(value3))); console.log(util.inspect(value3)); diff --git a/test/zk_test_create.js b/test/zk_test_create.js index b09e6a8a..001ee0ae 100644 --- a/test/zk_test_create.js +++ b/test/zk_test_create.js @@ -13,7 +13,7 @@ function zkTest (seq_, callback) { console.log ("myHolder=%j", myHolder); //console.log ("myCallee=%j", myCallee); this.zk.init ({connect:connect, timeout:200000, debug_level:ZK.ZOO_LOG_LEVEL_INFO, host_order_deterministic:false}); - this.zk.on (ZK.on_connected, function (zkk, clientid) { + this.zk.on ('connect', function (zkk, clientid) { console.log ("session #%d connected ok", seq_); var counter = 0; for (var i = 0; i < N; i ++) { diff --git a/test/zk_test_utf8.js b/test/zk_test_utf8.js index e9b04fd4..8c4d30c8 100644 --- a/test/zk_test_utf8.js +++ b/test/zk_test_utf8.js @@ -5,7 +5,7 @@ var zk = new ZK(); var connect = (process.argv[2] || 'localhost:2181'); zk.init({connect:connect, timeout:200000, debug_level:ZK.ZOO_LOG_LEVEL_WARN, host_order_deterministic:false}); -zk.on('connected', function (zkk) { +zk.on('connect', function (zkk) { console.log("zk session established, id=%s", zkk.client_id); var str = '\u00bd + \u00bc = \u00be'; var data = new String(str); diff --git a/test/zk_test_watcher_promise.js b/test/zk_test_watcher_promise.js index c1a15029..0cbf587a 100644 --- a/test/zk_test_watcher_promise.js +++ b/test/zk_test_watcher_promise.js @@ -1,5 +1,5 @@ var promise = require("../lib/promise"); -var ZK = require("../lib/zk_promise").ZK; +var ZK = require("../lib/zk_promise"); var assert = require ('assert'); var util = require('util'); var connect = (process.argv[2] || 'localhost:2181'); @@ -8,10 +8,18 @@ var connect = (process.argv[2] || 'localhost:2181'); var deferred_watcher_ready = promise.defer(); var deferred_watcher_triggered = promise.defer(); -var zk_config = {connect:connect, debug_level:ZK.ZOO_LOG_LEVEL_WARN, timeout:20000, host_order_deterministic:false}; +var zk_config = { + connect:connect, + debug_level:ZK.ZOO_LOG_LEVEL_WARN, + timeout:20000, + host_order_deterministic:false +}; //reader -var zk_r = new ZK ().init (zk_config); +var zk_r = new ZK (); +zk_r.setLogger(true); +zk_r.init (zk_config); +var context = {}; zk_r.on_connected(). then ( function (zkk){ @@ -20,7 +28,7 @@ zk_r.on_connected(). } ).then ( function (path) { - zk_r.context.path = path; + context.path = path; console.log ("node created path=%s", path); return zk_r.w_get (path, function (type, state, path_w) { // this is a watcher @@ -32,7 +40,7 @@ zk_r.on_connected(). ).then ( function (stat_and_value) { // this is the response from w_get above console.log ("get node: stat=%j, value=%j", stat_and_value[0], stat_and_value[1]); - deferred_watcher_ready.resolve (zk_r.context.path); + deferred_watcher_ready.resolve (context.path); return deferred_watcher_triggered; } ).then ( @@ -45,7 +53,9 @@ zk_r.on_connected(). ); //writer -var zk_w = new ZK().init (zk_config); +var zk_w = new ZK(); +zk_w.setLogger(true); +zk_w.init (zk_config); zk_w.on_connected(). then ( function (zkk) {