Skip to content

Commit

Permalink
add setEncoding() to replace 'data_as_buffer'. expose the public prop…
Browse files Browse the repository at this point in the history
…erties. add 'connect' method that takes a callback. fix zk_promise. fix tests
  • Loading branch information
Dave Dopson committed Dec 13, 2011
1 parent 1165e36 commit a947a95
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 101 deletions.
6 changes: 3 additions & 3 deletions lib/test-promise.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
sys = require("sys");
util = require("util");
var fs = require('./fs-promise');

// open a file and read it
fs.open("fs-promise.js", process.O_RDONLY).then(function(fd){
return fs.read(fd, 4096);
}).then(function(args){
sys.puts(args[0]);
util.puts(args[0]);
});

// does the same thing
fs.readFile("fs-promise.js").addCallback(sys.puts);
fs.readFile("fs-promise.js").addCallback(util.puts);
89 changes: 50 additions & 39 deletions lib/zk_promise.js
Original file line number Diff line number Diff line change
@@ -1,50 +1,61 @@
var assert = require ('assert');
var sys = require ('sys');
var promise = require("./promise");
var ZK = require ("./zookeeper").ZooKeeper;
exports.ZK = ZK;
var ZooKeeper = require ("./zookeeper");
var util = require('util');

ZK.prototype.context = {};
exports = module.exports = ZooKeeperPromise;
exports.ZK = ZooKeeperPromise; // backwards compatibility

ZK.prototype.on_connected = function () {
var deferred = promise.defer();
this.on (ZK.on_connected, function (zkk) {
deferred.resolve (zkk);
});
return deferred.promise;
function ZooKeeperPromise() {
var self = this;

ZooKeeper.apply(this);

return self;
}
util.inherits(ZooKeeperPromise, ZooKeeper);

ZooKeeperPromise.prototype.on_connected = function on_connected() {
var self = this;
var deferred = promise.defer();
self.on ('connect', function () {
deferred.resolve (self);
});
return deferred.promise;
};


convertZKAsyncFunction = function(asyncFunction){
return function(){
var deferred = promise.defer();
arguments.length ++;
arguments[arguments.length-1]=
function(rc, error, result){
if(rc) {
deferred.emitError(rc);
} else {
if(arguments.length > 3){
// if there are multiple success values, we return an array
Array.prototype.shift.call(arguments, 1);
Array.prototype.shift.call(arguments, 1);
deferred.emitSuccess(arguments);
}
else{
deferred.emitSuccess(result);
}
}
};
asyncFunction.apply (this, arguments);
return deferred.promise;

function convertAsync(fn){
return function() {
var self = this;
var deferred = promise.defer();
arguments.length ++;
arguments[arguments.length-1] = function(rc, error, result){
if(rc) {
deferred.emitError(rc);
} else {
if(arguments.length > 3){
// if there are multiple success values, we return an array
Array.prototype.shift.call(arguments, 1);
Array.prototype.shift.call(arguments, 1);
deferred.emitSuccess(arguments);
} else {
deferred.emitSuccess(result);
}
}
};
fn.apply (self, arguments);
return deferred.promise;
};
};

for (var f in ZK.prototype) {
var m = f.match(/^a(w?)_(.*)$/);
if (m) {
var new_func = m[1]? m[1] + "_" + m[2] : m[2];
ZK.prototype[new_func] = convertZKAsyncFunction (ZK.prototype[f]);
//console.log ("function %s is %j", f, sys.inspect(ZK.prototype[new_func],true,3));
}
for (var f in ZooKeeperPromise.prototype) {
var m = f.match(/^a(w?)_(.*)$/);
if (m) {
var new_func = m[1]? m[1] + "_" + m[2] : m[2];
ZooKeeperPromise.prototype[new_func] = convertAsync (ZooKeeperPromise.prototype[f]);
//console.log ("function %s is %j", f, util.inspect(ZK.prototype[new_func],true,3));
}
}

80 changes: 74 additions & 6 deletions lib/zookeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,50 @@ function ZooKeeper() {
self._native = new NativeZk();
self._native.emit = function(ev, a1, a2, a3) {
if(self.logger) self.logger("Emitting '" + ev + "' with args: " + a1 + ", " + a2 + ", " + a3);
if(typeof a1 === 'ZooKeeper') {
if(ev === 'connect') {
// the event is passing the native object. need to mangle this to return the wrapper instead
a1 = this;
a1 = self;
}
self.emit(ev, a1, a2, a3);
}

////////////////////////////////////////////////////////////////////////////////
// Public Properties
////////////////////////////////////////////////////////////////////////////////

function proxyProperty(name) {
self.__defineGetter__(name, function(){
return self._native[name];
});
self.__defineSetter__(name, function(val){
self._native[name] = val;
});
}

proxyProperty('state');
proxyProperty('timeout');
proxyProperty('client_id');
proxyProperty('is_unrecoverable');

self.encoding = null;

self.setEncoding = function setEncoding(val) {
self.encoding = val;
};

self.__defineGetter__('data_as_buffer', function(){
return self.encoding ? true : false;
});
self.__defineSetter__('data_as_buffer', function(val){
self.encoding = ((val == true) ? null : 'utf8');
});

}

util.inherits(ZooKeeper, EventEmitter);



////////////////////////////////////////////////////////////////////////////////
// Constants
////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -124,9 +158,37 @@ ZooKeeper.prototype.setLogger = function(logger) {
}
}

ZooKeeper.prototype.init = function init() {
ZooKeeper.prototype.init = function init(options) {
var self = this;
if(this.logger) this.logger("Calling init with " + util.inspect(arguments));
return this._native.init.apply(this._native, arguments);
if(!_.isUndefined(options.data_as_buffer)) {
self.data_as_buffer = options.data_as_buffer;
}
return this._native.init.call(this._native, options);
}

ZooKeeper.prototype.connect = function connect(options, cb) {
var self = this;
if(_.isFunction(options)) {
cb = options;
options = null;
}
self.init(options);

function errorHandler(err) {
self.removeListener('error', errorHandler);
self.removeListener('connect', connectHandler);
cb(err);
}

function connectHandler() {
self.removeListener('error', errorHandler);
self.removeListener('connect', connectHandler);
cb(null, self);
}

self.on('error', errorHandler);
self.on('connect', connectHandler);
}

ZooKeeper.prototype.close = function close() {
Expand All @@ -149,9 +211,15 @@ ZooKeeper.prototype.aw_exists = function aw_exists() {
return this._native.aw_exists.apply(this._native, arguments);
}

ZooKeeper.prototype.a_get = function a_get() {
ZooKeeper.prototype.a_get = function a_get(path, watch, data_cb) {
var self = this;
if(this.logger) this.logger("Calling a_get with " + util.inspect(arguments));
return this._native.a_get.apply(this._native, arguments);
return this._native.a_get.call(this._native, path, watch, function(rc, error, stat, data) {
if(self.encoding) {
data = data.toString(self.encoding);
}
data_cb(rc, error, stat, data);
});
}

ZooKeeper.prototype.aw_get = function aw_get() {
Expand Down
36 changes: 4 additions & 32 deletions src/node-zk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ namespace zk {
return; \
}

static Persistent<String> data_as_buffer;

#define DEFINE_STRING(ev,str) static Persistent<String> ev = NODE_PSYMBOL(str)
DEFINE_STRING (on_closed, "close");
DEFINE_STRING (on_connected, "connect");
Expand Down Expand Up @@ -157,7 +155,6 @@ class ZooKeeper: public ObjectWrap {
constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("client_id"), ClientidPropertyGetter, 0, Local<Value>(), PROHIBITS_OVERWRITING, ReadOnly);
constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("timeout"), SessionTimeoutPropertyGetter, 0, Local<Value>(), PROHIBITS_OVERWRITING, ReadOnly);
constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("is_unrecoverable"), IsUnrecoverablePropertyGetter, 0, Local<Value>(), PROHIBITS_OVERWRITING, ReadOnly);
constructor_template->InstanceTemplate()->SetAccessor(String::NewSymbol("data_as_buffer"), DataAsBufferPropertyGetter, DataAsBufferPropertySetter);

target->Set(String::NewSymbol("ZooKeeper"), constructor_template->GetFunction());
}
Expand Down Expand Up @@ -274,13 +271,8 @@ class ZooKeeper: public ObjectWrap {
int32_t session_timeout = arg->Get(String::NewSymbol("timeout"))->ToInt32()->Value();
if (session_timeout == 0) session_timeout = 20000;

Local<Value> asBuffer = arg->Get(String::NewSymbol("data_as_buffer"));

ZooKeeper *zk = ObjectWrap::Unwrap<ZooKeeper>(args.This());
assert(zk);
if( asBuffer != Undefined() && asBuffer != Null() ) {
zk->data_as_buffer = asBuffer->ToBoolean()->Value();
}

if (!zk->realInit(*_hostPort, session_timeout))
return ErrnoException(errno, "zookeeper_init", "failed to init", __FILE__);
Expand Down Expand Up @@ -504,13 +496,9 @@ class ZooKeeper: public ObjectWrap {
LOG_DEBUG(("rc=%d, rc_string=%s, value=%s", rc, zerror(rc), value));
argv[2] = stat != 0 ? zkk->createStatObject (stat) : Object::Cast(*Null());
if( value != 0 ) {
if( zkk->data_as_buffer) {
Buffer* b = Buffer::New(value_len);
memcpy(BufferData(b), value, value_len);
argv[3] = Local<Value>::New(b->handle_);
} else {
argv[3] = String::New(value, value_len);
}
Buffer* b = Buffer::New(value_len);
memcpy(BufferData(b), value, value_len);
argv[3] = Local<Value>::New(b->handle_);
} else {
argv[3] = String::Cast(*Null());
}
Expand Down Expand Up @@ -653,21 +641,6 @@ class ZooKeeper: public ObjectWrap {
return Integer::New (zk->zhandle != 0? is_unrecoverable (zk->zhandle) : 0);
}

static Handle<Value> DataAsBufferPropertyGetter(Local<String> property, const AccessorInfo &info) {
HandleScope scope;
ZooKeeper *zk = ObjectWrap::Unwrap<ZooKeeper>(info.This());
assert(zk);
return Boolean::New (zk->data_as_buffer);
}

static void DataAsBufferPropertySetter(Local<String> property, Local<Value> value, const AccessorInfo& info) {
HandleScope scope;
ZooKeeper *zk = ObjectWrap::Unwrap<ZooKeeper>(info.This());
assert(zk);

zk->data_as_buffer = value->BooleanValue();
}

void realClose () {
if (is_closed)
return;
Expand Down Expand Up @@ -704,7 +677,7 @@ class ZooKeeper: public ObjectWrap {

#define ZERO_MEM(member) bzero(&(member), sizeof(member))

ZooKeeper () : zhandle(0), clientIdFile(0), fd(-1), data_as_buffer(true) {
ZooKeeper () : zhandle(0), clientIdFile(0), fd(-1) {
ZERO_MEM (myid);
ZERO_MEM (zk_io);
ZERO_MEM (zk_timer);
Expand All @@ -720,7 +693,6 @@ class ZooKeeper: public ObjectWrap {
int interest;
timeval tv;
ev_tstamp last_activity; // time of last zookeeper event loop activity
bool data_as_buffer;
bool is_closed;
};

Expand Down
55 changes: 55 additions & 0 deletions test/foo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env node

var util = require('util');
var _ = require('underscore');

function MyFunctionalShape() {
if(! this instanceof MyFunctionalShape) {
return new MyFunctionalShape();
}
var self = this;
console.log('parent ctor: this=' + this);
}

MyFunctionalShape.prototype.baseMethod = function() { }

function MyFunctionalCircle() {
if(!_.isObject(this) || !(this instanceof MyFunctionalCircle)) {
return new MyFunctionalCircle(arguments);
}
var self = this;
MyFunctionalShape.apply(self);

self.fn = function() { }
console.log('child ctor: this=' + this);
}

MyFunctionalCircle.super_ = MyFunctionalShape;
MyFunctionalCircle.prototype = Object.create(MyFunctionalShape.prototype, {
constructor: {
value: MyFunctionalCircle,
enumerable: false,
writable: true,
configurable: true
}
});

MyFunctionalCircle.prototype.method = function() {
console.log("a: " + util.inspect(this, true, 99));

};

MyFunctionalShape.prototype.baseMethod2 = function() {};

function lookAt(obj, name) {
console.log("Looking at " + name + ":");
for(key in obj) {
console.log(name + "['" + key + "'] = " + obj[key]);
}
}


var a = new MyFunctionalCircle();
lookAt(a, 'a');
console.log("a: " + util.inspect(a, true, 99));
a.method();
Loading

0 comments on commit a947a95

Please sign in to comment.