Permalink
Browse files

Merge remote branch 'broofa/master'

  • Loading branch information...
2 parents 0165290 + 8fa2d0d commit 1ee4f7e99600c4db9d98a6eded69e576e0021458 @wadey wadey committed Jan 30, 2011
Showing with 66 additions and 66 deletions.
  1. +15 −3 lib/thrift/connection.js
  2. +51 −63 lib/thrift/transport.js
View
@@ -11,16 +11,28 @@ var int32FramedReceiver = exports.int32FramedReceiver = function (callback) {
var frameLeft = 0,
framePos = 0,
frame = null;
+ var residual = null;
return function(data) {
+ // Prepend any residual data from our previous read
+ if (residual) {
+ var dat = new Buffer(data.length + residual.length);
+ residual.copy(dat, 0, 0);
+ data.copy(dat, residual.length, 0);
+ residual = null;
+ }
+
// var buf = new Buffer(data, 'binary');
// console.log(buf);
// framed transport
while (data.length) {
if (frameLeft === 0) {
// TODO assumes we have all 4 bytes
if (data.length < 4) {
- throw Error("Not enough bytes");
+ console.log("Expecting > 4 bytes, found only " + data.length);
+ residual = data;
+ break;
+ //throw Error("Expecting > 4 bytes, found only " + data.length);
}
frameLeft = BinaryParser.toInt(data.slice(0,4));
frame = new Buffer(frameLeft);
@@ -42,7 +54,7 @@ var int32FramedReceiver = exports.int32FramedReceiver = function (callback) {
}
}
};
-}
+};
var Connection = exports.Connection = function(stream, options) {
var self = this;
@@ -89,7 +101,7 @@ var Connection = exports.Connection = function(stream, options) {
self.client['recv_' + r.fname](input, r.mtype, r.rseqid);
// self.emit("data", data);
}));
-}
+};
sys.inherits(Connection, EventEmitter);
exports.createConnection = function(host, port, options) {
View
@@ -1,69 +1,57 @@
-var TMemoryBuffer = exports.TMemoryBuffer = function(buffer, flushCallback) {
- if (buffer !== undefined) {
- this.recv_buf = buffer;
- } else {
- this.recv_buf = new Buffer(0);
- }
- this.recv_buf_sz = this.recv_buf.length;
- this.send_buf = [];
- this.rpos = 0;
- this.flushCallback = flushCallback;
-}
-
-TMemoryBuffer.prototype.isOpen = function() {
- // TODO
- return true;
-}
-
-TMemoryBuffer.prototype.open = function() {
-}
-
-TMemoryBuffer.prototype.close = function() {
-}
-
-TMemoryBuffer.prototype.read = function(len) {
- var avail = this.recv_buf_sz - this.rpos;
- // console.log("avail: " + avail);
-
- if(avail == 0)
- return new Buffer(0);
-
- var give = len
-
- if(avail < len) {
- console.log("asked for: " + len);
- throw new Error("asked for too much");
- give = avail
- }
-
- // console.log(this.rpos + "," + give);
- var ret = this.recv_buf.slice(this.rpos,this.rpos + give)
- this.rpos += give
- // console.log(ret);
-
- //clear buf when complete?
- return ret
+var emptyBuf = new Buffer(0);
+
+var TMemoryBuffer = exports.TMemoryBuffer = function(buffer, callback) {
+ this.inBuf = buffer || emptyBuf;
+ this.outBuffers = [];
+ this.outCount = 0;
+ this.readPos = 0;
+ this.onFlush = callback;
+};
+
+TMemoryBuffer.prototype = {
+ // TODO: Implement open/close support
+ isOpen: function() {return true;},
+ open: function() {},
+ close: function() {},
+
+ read: function(len) {
+ var end = this.readPos + len;
+
+ if (this.inBuf.length < end) {
+ throw new Error('read(' + len + ') failed - not enough data');
+ }
-}
+ var buf = this.inBuf.slice(this.readPos, end);
+ this.readPos = end;
+ return buf;
+ },
-TMemoryBuffer.prototype.readAll = function() {
- return this.recv_buf;
-}
+ readAll: function() {
+ return this.inBuf;
+ },
-TMemoryBuffer.prototype.write = function(buf) {
- // TODO
- if (typeof(buf) === "string") {
- for (var i = 0; i < buf.length; ++i) {
- this.send_buf.push(buf.charCodeAt(i));
+ write: function(buf, encoding) {
+ if (typeof(buf) === "string") {
+ // Defaulting to ascii encoding here since that's more like the original
+ // code, but I feel like 'utf8' would be a better choice.
+ buf = new Buffer(buf, encoding || 'ascii');
}
- } else {
- for (var i = 0; i < buf.length; ++i) {
- this.send_buf.push(buf[i]);
+ this.outBuffers.push(buf);
+ this.outCount += buf.length;
+ },
+
+ flush: function() {
+ var out = new Buffer(this.outCount), pos = 0;
+ this.outBuffers.forEach(function(buf) {
+ buf.copy(out, pos, 0);
+ pos += buf.length;
+ });
+
+ if (this.onFlush) {
+ this.onFlush(out);
}
- }
-}
-TMemoryBuffer.prototype.flush = function() {
- this.flushCallback(new Buffer(this.send_buf));
- this.send_buf = [];
-}
+ this.outBuffers = [];
+ this.outCount = 0;
+ }
+};

0 comments on commit 1ee4f7e

Please sign in to comment.