Skip to content
Browse files

THRIFT-1129: Merge remote-tracking branch 'tagomoris/master'

  • Loading branch information...
2 parents c012903 + 5b77068 commit ae8be1ff8af50607da52f51c06556f05842692e5 @wadey committed
View
40 examples/client_multitransport.js
@@ -0,0 +1,40 @@
+var thrift = require('thrift'),
+ ttransport = require('thrift/transport');
+
+var UserStorage = require('./gen-nodejs/UserStorage'),
+ ttypes = require('./gen-nodejs/user_types');
+
+var f_conn = thrift.createConnection('localhost', 9090), // default: framed
+ f_client = thrift.createClient(UserStorage, f_conn);
+var b_conn = thrift.createConnection('localhost', 9091, {transport: ttransport.TBufferedTransport}),
+ b_client = thrift.createClient(UserStorage, b_conn);
+var user1 = new ttypes.UserProfile({uid: 1,
+ name: "Mark Slee",
+ blurb: "I'll find something to put here."});
+var user2 = new ttypes.UserProfile({uid: 2,
+ name: "Satoshi Tagomori",
+ blurb: "ok, let's test with buffered transport."});
+
+f_conn.on('error', function(err) {
+ console.error("framed:", err);
+});
+
+f_client.store(user1, function(err, response) {
+ if (err) { console.error(err); return; }
+
+ console.log("stored:", user1.uid, " as ", user1.name);
+ b_client.retrieve(user1.uid, function(err, responseUser) {
+ if (err) { console.error(err); return; }
+ console.log("retrieved:", responseUser.uid, " as ", responseUser.name);
+ });
+});
+
+b_client.store(user2, function(err, response) {
+ if (err) { console.error(err); return; }
+
+ console.log("stored:", user2.uid, " as ", user2.name);
+ f_client.retrieve(user2.uid, function(err, responseUser) {
+ if (err) { console.error(err); return; }
+ console.log("retrieved:", responseUser.uid, " as ", responseUser.name);
+ });
+});
View
224 examples/gen-nodejs/BucketStoreMapping.js
@@ -0,0 +1,224 @@
+//
+// Autogenerated by Thrift
+//
+// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+//
+var Thrift = require('thrift').Thrift;
+
+var ttypes = require('./bucketupdater_types');
+//HELPER FUNCTIONS AND STRUCTURES
+
+var BucketStoreMapping_getMapping_args = function(args){
+ this.category = null
+if( args != null ){ if (null != args.category)
+ this.category = args.category
+}}
+BucketStoreMapping_getMapping_args.prototype = {}
+BucketStoreMapping_getMapping_args.prototype.read = function(input){
+ var ret = input.readStructBegin()
+ while (1)
+ {
+ var ret = input.readFieldBegin()
+ var fname = ret.fname
+ var ftype = ret.ftype
+ var fid = ret.fid
+ if (ftype == Thrift.Type.STOP)
+ break
+ switch(fid)
+ {
+ case 1: if (ftype == Thrift.Type.STRING) {
+ this.category = input.readString()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ default:
+ input.skip(ftype)
+ }
+ input.readFieldEnd()
+ }
+ input.readStructEnd()
+ return
+}
+
+BucketStoreMapping_getMapping_args.prototype.write = function(output){
+ output.writeStructBegin('BucketStoreMapping_getMapping_args')
+ if (null != this.category) {
+ output.writeFieldBegin('category', Thrift.Type.STRING, 1)
+ output.writeString(this.category)
+ output.writeFieldEnd()
+ }
+ output.writeFieldStop()
+ output.writeStructEnd()
+ return
+}
+
+var BucketStoreMapping_getMapping_result = function(args){
+ this.success = null
+ this.e = null
+if( args != null ){ if (null != args.success)
+ this.success = args.success
+ if (null != args.e)
+ this.e = args.e
+}}
+BucketStoreMapping_getMapping_result.prototype = {}
+BucketStoreMapping_getMapping_result.prototype.read = function(input){
+ var ret = input.readStructBegin()
+ while (1)
+ {
+ var ret = input.readFieldBegin()
+ var fname = ret.fname
+ var ftype = ret.ftype
+ var fid = ret.fid
+ if (ftype == Thrift.Type.STOP)
+ break
+ switch(fid)
+ {
+ case 0: if (ftype == Thrift.Type.MAP) {
+ {
+ var _size0 = 0
+ var rtmp3
+ this.success = {}
+ var _ktype1 = 0
+ var _vtype2 = 0
+ rtmp3 = input.readMapBegin()
+ _ktype1= rtmp3.ktype
+ _vtype2= rtmp3.vtype
+ _size0= rtmp3.size
+ for (var _i4 = 0; _i4 < _size0; ++_i4)
+ {
+ key5 = null
+ val6 = null
+ key5 = input.readI32()
+ val6 = new ttypes.HostPort()
+ val6.read(input)
+ this.success[key5] = val6
+ }
+ input.readMapEnd()
+ }
+ } else {
+ input.skip(ftype)
+ }
+ break
+ case 1: if (ftype == Thrift.Type.STRUCT) {
+ this.e = new ttypes.BucketStoreMappingException()
+ this.e.read(input)
+ } else {
+ input.skip(ftype)
+ }
+ break
+ default:
+ input.skip(ftype)
+ }
+ input.readFieldEnd()
+ }
+ input.readStructEnd()
+ return
+}
+
+BucketStoreMapping_getMapping_result.prototype.write = function(output){
+ output.writeStructBegin('BucketStoreMapping_getMapping_result')
+ if (null != this.success) {
+ output.writeFieldBegin('success', Thrift.Type.MAP, 0)
+ {
+ output.writeMapBegin(Thrift.Type.I32, Thrift.Type.STRUCT, Thrift.objectLength(this.success))
+ {
+ for(var kiter7 in this.success) {
+ if (this.success.hasOwnProperty(kiter7))
+ {
+ var viter8 = this.success[kiter7]
+ output.writeI32(kiter7)
+ viter8.write(output)
+ }
+ }
+ }
+ output.writeMapEnd()
+ }
+ output.writeFieldEnd()
+ }
+ if (null != this.e) {
+ output.writeFieldBegin('e', Thrift.Type.STRUCT, 1)
+ this.e.write(output)
+ output.writeFieldEnd()
+ }
+ output.writeFieldStop()
+ output.writeStructEnd()
+ return
+}
+
+var BucketStoreMappingClient = exports.Client = function(output, pClass) {
+ this.output = output;
+ this.pClass = pClass;
+ this.seqid = 0;
+ this._reqs = {}
+}
+BucketStoreMappingClient.prototype = {}
+BucketStoreMappingClient.prototype.getMapping = function(category,callback){
+ this.seqid += 1;
+ this._reqs[this.seqid] = callback;
+ this.send_getMapping(category)
+}
+
+BucketStoreMappingClient.prototype.send_getMapping = function(category){
+ var output = new this.pClass(this.output);
+ output.writeMessageBegin('getMapping', Thrift.MessageType.CALL, this.seqid)
+ var args = new BucketStoreMapping_getMapping_args()
+ args.category = category
+ args.write(output)
+ output.writeMessageEnd()
+ return this.output.flush()
+}
+
+BucketStoreMappingClient.prototype.recv_getMapping = function(input,mtype,rseqid){
+ var callback = this._reqs[rseqid] || function() {};
+ delete this._reqs[rseqid];
+ if (mtype == Thrift.MessageType.EXCEPTION) {
+ var x = new Thrift.TApplicationException()
+ x.read(input)
+ input.readMessageEnd()
+ return callback(x);
+ }
+ var result = new BucketStoreMapping_getMapping_result()
+ result.read(input)
+ input.readMessageEnd()
+
+ if (null != result.e) {
+ return callback(result.e);
+ }
+ if (null != result.success ) {
+ return callback(null, result.success);
+ }
+ return callback("getMapping failed: unknown result");
+}
+var BucketStoreMappingProcessor = exports.Processor = function(handler) {
+ this._handler = handler
+}
+BucketStoreMappingProcessor.prototype.process = function(input, output) {
+ var r = input.readMessageBegin()
+ if (this['process_' + r.fname]) {
+ return this['process_' + r.fname].call(this, r.rseqid, input, output)
+ } else {
+ input.skip(Thrift.Type.STRUCT)
+ input.readMessageEnd()
+ var x = new Thrift.TApplicationException(Thrift.TApplicationExceptionType.UNKNOWN_METHOD, 'Unknown function ' + r.fname)
+ output.writeMessageBegin(r.fname, Thrift.MessageType.Exception, r.rseqid)
+ x.write(output)
+ output.writeMessageEnd()
+ output.flush()
+ }
+}
+
+BucketStoreMappingProcessor.prototype.process_getMapping = function(seqid, input, output) {
+ var args = new BucketStoreMapping_getMapping_args()
+ args.read(input)
+ input.readMessageEnd()
+ var result = new BucketStoreMapping_getMapping_result()
+ this._handler.getMapping(args.category, function(success) {
+ result.success = success
+ output.writeMessageBegin("getMapping", Thrift.MessageType.REPLY, seqid)
+ result.write(output)
+ output.writeMessageEnd()
+ output.flush()
+ })
+}
+
View
1,676 examples/gen-nodejs/FacebookService.js
1,676 additions, 0 deletions not shown because the diff is too large. Please use a local Git client to view these changes.
View
127 examples/gen-nodejs/bucketupdater_types.js
@@ -0,0 +1,127 @@
+//
+// Autogenerated by Thrift
+//
+// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+//
+var Thrift = require('thrift').Thrift;
+var ttypes = module.exports = {};
+var BucketStoreMappingException = module.exports.BucketStoreMappingException = function(args){
+ Thrift.TException.call(this, "BucketStoreMappingException")
+ this.name = "BucketStoreMappingException"
+ this.message = null
+ this.code = null
+if( args != null ){ if (null != args.message)
+ this.message = args.message
+ if (null != args.code)
+ this.code = args.code
+}}
+Thrift.inherits(BucketStoreMappingException, Thrift.TException)
+BucketStoreMappingException.prototype.read = function(input){
+ var ret = input.readStructBegin()
+ while (1)
+ {
+ var ret = input.readFieldBegin()
+ var fname = ret.fname
+ var ftype = ret.ftype
+ var fid = ret.fid
+ if (ftype == Thrift.Type.STOP)
+ break
+ switch(fid)
+ {
+ case 1: if (ftype == Thrift.Type.STRING) {
+ this.message = input.readString()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ case 2: if (ftype == Thrift.Type.I32) {
+ this.code = input.readI32()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ default:
+ input.skip(ftype)
+ }
+ input.readFieldEnd()
+ }
+ input.readStructEnd()
+ return
+}
+
+BucketStoreMappingException.prototype.write = function(output){
+ output.writeStructBegin('BucketStoreMappingException')
+ if (null != this.message) {
+ output.writeFieldBegin('message', Thrift.Type.STRING, 1)
+ output.writeString(this.message)
+ output.writeFieldEnd()
+ }
+ if (null != this.code) {
+ output.writeFieldBegin('code', Thrift.Type.I32, 2)
+ output.writeI32(this.code)
+ output.writeFieldEnd()
+ }
+ output.writeFieldStop()
+ output.writeStructEnd()
+ return
+}
+
+var HostPort = module.exports.HostPort = function(args){
+ this.host = null
+ this.port = null
+if( args != null ){ if (null != args.host)
+ this.host = args.host
+ if (null != args.port)
+ this.port = args.port
+}}
+HostPort.prototype = {}
+HostPort.prototype.read = function(input){
+ var ret = input.readStructBegin()
+ while (1)
+ {
+ var ret = input.readFieldBegin()
+ var fname = ret.fname
+ var ftype = ret.ftype
+ var fid = ret.fid
+ if (ftype == Thrift.Type.STOP)
+ break
+ switch(fid)
+ {
+ case 2: if (ftype == Thrift.Type.STRING) {
+ this.host = input.readString()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ case 3: if (ftype == Thrift.Type.I32) {
+ this.port = input.readI32()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ default:
+ input.skip(ftype)
+ }
+ input.readFieldEnd()
+ }
+ input.readStructEnd()
+ return
+}
+
+HostPort.prototype.write = function(output){
+ output.writeStructBegin('HostPort')
+ if (null != this.host) {
+ output.writeFieldBegin('host', Thrift.Type.STRING, 2)
+ output.writeString(this.host)
+ output.writeFieldEnd()
+ }
+ if (null != this.port) {
+ output.writeFieldBegin('port', Thrift.Type.I32, 3)
+ output.writeI32(this.port)
+ output.writeFieldEnd()
+ }
+ output.writeFieldStop()
+ output.writeStructEnd()
+ return
+}
+
View
15 examples/gen-nodejs/fb303_types.js
@@ -0,0 +1,15 @@
+//
+// Autogenerated by Thrift
+//
+// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+//
+var Thrift = require('thrift').Thrift;
+var ttypes = module.exports = {};
+ttypes.fb_status = {
+'DEAD' : 0
+,'STARTING' : 1
+,'ALIVE' : 2
+,'STOPPING' : 3
+,'STOPPED' : 4
+,'WARNING' : 5
+}
View
206 examples/gen-nodejs/scribe.js
@@ -0,0 +1,206 @@
+//
+// Autogenerated by Thrift
+//
+// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+//
+var Thrift = require('thrift').Thrift;
+var fb303_ttypes = require('./fb303_types')
+
+
+var FacebookService = require('./FacebookService')
+var FacebookServiceClient = FacebookService.Client
+var ttypes = require('./scribe_types');
+//HELPER FUNCTIONS AND STRUCTURES
+
+var scribe_Log_args = function(args){
+ this.messages = null
+if( args != null ){ if (null != args.messages)
+ this.messages = args.messages
+}}
+scribe_Log_args.prototype = {}
+scribe_Log_args.prototype.read = function(input){
+ var ret = input.readStructBegin()
+ while (1)
+ {
+ var ret = input.readFieldBegin()
+ var fname = ret.fname
+ var ftype = ret.ftype
+ var fid = ret.fid
+ if (ftype == Thrift.Type.STOP)
+ break
+ switch(fid)
+ {
+ case 1: if (ftype == Thrift.Type.LIST) {
+ {
+ var _size0 = 0
+ var rtmp3
+ this.messages = []
+ var _etype3 = 0
+ rtmp3 = input.readListBegin()
+ _etype3 = rtmp3.etype
+ _size0 = rtmp3.size
+ for (var _i4 = 0; _i4 < _size0; ++_i4)
+ {
+ var elem5 = null
+ elem5 = new ttypes.LogEntry()
+ elem5.read(input)
+ this.messages.push(elem5)
+ }
+ input.readListEnd()
+ }
+ } else {
+ input.skip(ftype)
+ }
+ break
+ default:
+ input.skip(ftype)
+ }
+ input.readFieldEnd()
+ }
+ input.readStructEnd()
+ return
+}
+
+scribe_Log_args.prototype.write = function(output){
+ output.writeStructBegin('scribe_Log_args')
+ if (null != this.messages) {
+ output.writeFieldBegin('messages', Thrift.Type.LIST, 1)
+ {
+ output.writeListBegin(Thrift.Type.STRUCT, this.messages.length)
+ {
+ for(var iter6 in this.messages)
+ {
+ if (this.messages.hasOwnProperty(iter6))
+ {
+ iter6=this.messages[iter6]
+ iter6.write(output)
+ }
+ }
+ }
+ output.writeListEnd()
+ }
+ output.writeFieldEnd()
+ }
+ output.writeFieldStop()
+ output.writeStructEnd()
+ return
+}
+
+var scribe_Log_result = function(args){
+ this.success = null
+if( args != null ){ if (null != args.success)
+ this.success = args.success
+}}
+scribe_Log_result.prototype = {}
+scribe_Log_result.prototype.read = function(input){
+ var ret = input.readStructBegin()
+ while (1)
+ {
+ var ret = input.readFieldBegin()
+ var fname = ret.fname
+ var ftype = ret.ftype
+ var fid = ret.fid
+ if (ftype == Thrift.Type.STOP)
+ break
+ switch(fid)
+ {
+ case 0: if (ftype == Thrift.Type.I32) {
+ this.success = input.readI32()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ default:
+ input.skip(ftype)
+ }
+ input.readFieldEnd()
+ }
+ input.readStructEnd()
+ return
+}
+
+scribe_Log_result.prototype.write = function(output){
+ output.writeStructBegin('scribe_Log_result')
+ if (null != this.success) {
+ output.writeFieldBegin('success', Thrift.Type.I32, 0)
+ output.writeI32(this.success)
+ output.writeFieldEnd()
+ }
+ output.writeFieldStop()
+ output.writeStructEnd()
+ return
+}
+
+var scribeClient = exports.Client = function(output, pClass) {
+ this.output = output;
+ this.pClass = pClass;
+ this.seqid = 0;
+ this._reqs = {}
+}
+Thrift.inherits(scribeClient, FacebookServiceClient)
+scribeClient.prototype.Log = function(messages,callback){
+ this.seqid += 1;
+ this._reqs[this.seqid] = callback;
+ this.send_Log(messages)
+}
+
+scribeClient.prototype.send_Log = function(messages){
+ var output = new this.pClass(this.output);
+ output.writeMessageBegin('Log', Thrift.MessageType.CALL, this.seqid)
+ var args = new scribe_Log_args()
+ args.messages = messages
+ args.write(output)
+ output.writeMessageEnd()
+ return this.output.flush()
+}
+
+scribeClient.prototype.recv_Log = function(input,mtype,rseqid){
+ var callback = this._reqs[rseqid] || function() {};
+ delete this._reqs[rseqid];
+ if (mtype == Thrift.MessageType.EXCEPTION) {
+ var x = new Thrift.TApplicationException()
+ x.read(input)
+ input.readMessageEnd()
+ return callback(x);
+ }
+ var result = new scribe_Log_result()
+ result.read(input)
+ input.readMessageEnd()
+
+ if (null != result.success ) {
+ return callback(null, result.success);
+ }
+ return callback("Log failed: unknown result");
+}
+var scribeProcessor = exports.Processor = function(handler) {
+ this._handler = handler
+}
+scribeProcessor.prototype.process = function(input, output) {
+ var r = input.readMessageBegin()
+ if (this['process_' + r.fname]) {
+ return this['process_' + r.fname].call(this, r.rseqid, input, output)
+ } else {
+ input.skip(Thrift.Type.STRUCT)
+ input.readMessageEnd()
+ var x = new Thrift.TApplicationException(Thrift.TApplicationExceptionType.UNKNOWN_METHOD, 'Unknown function ' + r.fname)
+ output.writeMessageBegin(r.fname, Thrift.MessageType.Exception, r.rseqid)
+ x.write(output)
+ output.writeMessageEnd()
+ output.flush()
+ }
+}
+
+scribeProcessor.prototype.process_Log = function(seqid, input, output) {
+ var args = new scribe_Log_args()
+ args.read(input)
+ input.readMessageEnd()
+ var result = new scribe_Log_result()
+ this._handler.Log(args.messages, function(success) {
+ result.success = success
+ output.writeMessageBegin("Log", Thrift.MessageType.REPLY, seqid)
+ result.write(output)
+ output.writeMessageEnd()
+ output.flush()
+ })
+}
+
View
70 examples/gen-nodejs/scribe_types.js
@@ -0,0 +1,70 @@
+//
+// Autogenerated by Thrift
+//
+// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+//
+var Thrift = require('thrift').Thrift;
+var ttypes = module.exports = {};
+ttypes.ResultCode = {
+'OK' : 0
+,'TRY_LATER' : 1
+}
+var LogEntry = module.exports.LogEntry = function(args){
+ this.category = null
+ this.message = null
+if( args != null ){ if (null != args.category)
+ this.category = args.category
+ if (null != args.message)
+ this.message = args.message
+}}
+LogEntry.prototype = {}
+LogEntry.prototype.read = function(input){
+ var ret = input.readStructBegin()
+ while (1)
+ {
+ var ret = input.readFieldBegin()
+ var fname = ret.fname
+ var ftype = ret.ftype
+ var fid = ret.fid
+ if (ftype == Thrift.Type.STOP)
+ break
+ switch(fid)
+ {
+ case 1: if (ftype == Thrift.Type.STRING) {
+ this.category = input.readString()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ case 2: if (ftype == Thrift.Type.STRING) {
+ this.message = input.readString()
+ } else {
+ input.skip(ftype)
+ }
+ break
+ default:
+ input.skip(ftype)
+ }
+ input.readFieldEnd()
+ }
+ input.readStructEnd()
+ return
+}
+
+LogEntry.prototype.write = function(output){
+ output.writeStructBegin('LogEntry')
+ if (null != this.category) {
+ output.writeFieldBegin('category', Thrift.Type.STRING, 1)
+ output.writeString(this.category)
+ output.writeFieldEnd()
+ }
+ if (null != this.message) {
+ output.writeFieldBegin('message', Thrift.Type.STRING, 2)
+ output.writeString(this.message)
+ output.writeFieldEnd()
+ }
+ output.writeFieldStop()
+ output.writeStructEnd()
+ return
+}
+
View
42 examples/scribe_client.js
@@ -0,0 +1,42 @@
+/* HOW TO prepare gen-nodejs
+ $ thrift -I ~/tmp/scribe -I ~/tmp/thrift-0.6.0/contrib --gen js:node ~/tmp/scribe/if/scribe.thrift
+ $ thrift -I ~/tmp/scribe -I ~/tmp/thrift-0.6.0/contrib --gen js:node ~/tmp/scribe/if/bucketupdater.thrift
+ $ thrift -I ~/tmp/scribe -I ~/tmp/thrift-0.6.0/contrib --gen js:node ~/tmp/thrift-0.6.0/contrib/fb303/if/fb303.thrift
+ */
+var thrift = require('thrift'),
+ scribe = require('./gen-nodejs/scribe'),
+ ttypes = require('./gen-nodejs/scribe_types');
+
+var connection = thrift.createConnection('localhost', 1463),
+ client = thrift.createClient(scribe, connection);
+
+var counter = 0;
+
+var str_times = function(str,times) {
+ var r = '';
+ for (var i = 0; i < times; i++) {
+ r += str;
+ }
+ return r;
+};
+
+var push_lines = function() {
+ var num = Math.floor(Math.random() * 100);
+ var now = '[' + (new Date()).toString() + ']';
+ var logheader = '192.168.0.1 - - ' + now + ' ';
+ var lines = [];
+ for (var i = 0; i < num; i++) {
+ lines.push(logheader + (counter + i) + ' ' + str_times('x', Math.floor(Math.random() * 100)) + "\n");
+ }
+ var logs = lines.map(function(x){ return new ttypes.LogEntry({category: 'thrifttest', message: x}); });
+ console.log("transferring...", logs.length);
+ var result = client.Log(logs, function(err, success){ if (! err){ counter += num; }});
+};
+
+var loop_id = setInterval(push_lines, 1000);
+
+setTimeout(function() {
+ clearInterval(loop_id);
+ connection.end();
+ console.log("transferred:", counter);
+}, 100 * 1000);
View
28 examples/server_multitransport.js
@@ -0,0 +1,28 @@
+var thrift = require('thrift'),
+ ttransport = require('thrift/transport');
+
+var UserStorage = require('./gen-nodejs/UserStorage'),
+ ttypes = require('./gen-nodejs/user_types');
+
+var users = {};
+
+var store = function(user, success) {
+ console.log("stored:", user.uid);
+ users[user.uid] = user;
+ success();
+};
+var retrieve = function(uid, success) {
+ console.log("retrieved:", uid);
+ success(users[uid]);
+};
+
+var server_framed = thrift.createServer(UserStorage, {
+ store: store,
+ retrieve: retrieve
+});
+server_framed.listen(9090);
+var server_buffered = thrift.createServer(UserStorage, {
+ store: store,
+ retrieve: retrieve
+}, {transport: ttransport.TBufferedTransport});
+server_buffered.listen(9091);
View
122 lib/thrift/connection.js
@@ -1,72 +1,26 @@
var sys = require('sys'),
EventEmitter = require("events").EventEmitter,
net = require('net'),
- TMemoryBuffer = require('./transport').TMemoryBuffer,
- TBinaryProtocol = require('./protocol').TBinaryProtocol;
+ ttransport = require('./transport'),
+ tprotocol = require('./protocol');
var BinaryParser = require('./binary_parser').BinaryParser;
BinaryParser.bigEndian = true;
-var int32FramedReceiver = exports.int32FramedReceiver = function (callback) {
- var frameLeft = 0,
- framePos = 0,
- frame = null;
- var residual = null;
-
- return function(data) {
- // Prepend any residual data from our previous read
- if (residual) {
- var dat = new Buffer(data.length + residual.length);
- residual.copy(dat, 0, 0);
- data.copy(dat, residual.length, 0);
- residual = null;
- }
-
- // var buf = new Buffer(data, 'binary');
- // console.log(buf);
- // framed transport
- while (data.length) {
- if (frameLeft === 0) {
- // TODO assumes we have all 4 bytes
- if (data.length < 4) {
- console.log("Expecting > 4 bytes, found only " + data.length);
- residual = data;
- break;
- //throw Error("Expecting > 4 bytes, found only " + data.length);
- }
- frameLeft = BinaryParser.toInt(data.slice(0,4));
- frame = new Buffer(frameLeft);
- framePos = 0;
- data = data.slice(4, data.length);
- }
-
- if (data.length >= frameLeft) {
- data.copy(frame, framePos, 0, frameLeft);
- data = data.slice(frameLeft, data.length);
-
- frameLeft = 0;
- callback(frame);
- } else if (data.length) {
- data.copy(frame, framePos, 0, data.length);
- frameLeft -= data.length;
- framePos += data.length;
- data = data.slice(data.length, data.length);
- }
- }
- };
-};
-
var Connection = exports.Connection = function(stream, options) {
var self = this;
EventEmitter.call(this);
this.connection = stream;
- this.offline_queue = [];
this.options = options || {};
+ this.transport = this.options.transport || ttransport.TFramedTransport;
+ this.protocol = this.options.protocol || tprotocol.TBinaryProtocol;
+ this.offline_queue = [];
this.connected = false;
this.connection.addListener("connect", function() {
self.connected = true;
+
this.setTimeout(self.options.timeout || 0);
this.setNoDelay();
this.frameLeft = 0;
@@ -93,17 +47,47 @@ var Connection = exports.Connection = function(stream, options) {
self.emit("timeout");
});
- this.connection.addListener("data", int32FramedReceiver(function(data) {
- // console.log(typeof(data));
- var input = new TBinaryProtocol(new TMemoryBuffer(data));
- var r = input.readMessageBegin();
- // console.log(r);
- self.client['recv_' + r.fname](input, r.mtype, r.rseqid);
- // self.emit("data", data);
+ this.connection.addListener("data", self.transport.receiver(function(transport_with_data) {
+ var message = new self.protocol(transport_with_data);
+ try {
+ var header = message.readMessageBegin();
+ var dummy_seqid = header.rseqid * -1;
+ var client = self.client;
+ client._reqs[dummy_seqid] = function(err, success){
+ transport_with_data.commitPosition();
+
+ var callback = client._reqs[header.rseqid];
+ delete client._reqs[header.rseqid];
+ if (callback) {
+ callback(err, success);
+ }
+ };
+ client['recv_' + header.fname](message, header.mtype, dummy_seqid);
+ }
+ catch (e) {
+ if (e instanceof ttransport.InputBufferUnderrunError) {
+ transport_with_data.rollbackPosition();
+ }
+ else {
+ throw e;
+ }
+ }
}));
};
sys.inherits(Connection, EventEmitter);
+Connection.prototype.end = function() {
+ this.connection.end();
+}
+
+Connection.prototype.write = function(data) {
+ if (!this.connected) {
+ this.offline_queue.push(data);
+ return;
+ }
+ this.connection.write(data);
+}
+
exports.createConnection = function(host, port, options) {
var stream = net.createConnection(port, host);
var connection = new Connection(stream, options);
@@ -117,28 +101,12 @@ exports.createClient = function(cls, connection) {
if (cls.Client) {
cls = cls.Client;
}
- var client = new cls(new TMemoryBuffer(undefined, function(buf) {
+ var client = new cls(new connection.transport(undefined, function(buf) {
connection.write(buf);
- }), TBinaryProtocol);
+ }), connection.protocol);
// TODO clean this up
connection.client = client;
return client;
}
-
-Connection.prototype.end = function() {
- this.connection.end();
-}
-
-Connection.prototype.write = function(buf) {
- // TODO: optimize this better, allocate one buffer instead of both:
- var msg = new Buffer(buf.length + 4);
- BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4);
- buf.copy(msg, 4, 0, buf.length);
- if (!this.connected) {
- this.offline_queue.push(msg);
- } else {
- this.connection.write(msg);
- }
-}
View
36 lib/thrift/server.js
@@ -1,33 +1,41 @@
var sys = require('sys'),
net = require('net');
+var ttransport = require('./transport');
var BinaryParser = require('./binary_parser').BinaryParser,
- TMemoryBuffer = require('./transport').TMemoryBuffer,
- TBinaryProtocol = require('./protocol').TBinaryProtocol,
- int32FramedReceiver = require('./connection').int32FramedReceiver;
+ TBinaryProtocol = require('./protocol').TBinaryProtocol;
-exports.createServer = function(cls, handler) {
+exports.createServer = function(cls, handler, options) {
if (cls.Processor) {
cls = cls.Processor;
}
var processor = new cls(handler);
+ var transport = (options && options.transport) ? options.transport : ttransport.TFramedTransport;
+ var protocol = (options && options.protocol) ? options.protocol : TBinaryProtocol;
return net.createServer(function(stream) {
- stream.on('data', int32FramedReceiver(function(data) {
- var input = new TBinaryProtocol(new TMemoryBuffer(data));
- var output = new TBinaryProtocol(new TMemoryBuffer(undefined, function(buf) {
- // TODO: optimize this better, allocate one buffer instead of both:
- var msg = new Buffer(buf.length + 4);
- BinaryParser.fromInt(buf.length).copy(msg, 0, 0, 4);
- buf.copy(msg, 4, 0, buf.length);
- stream.write(msg);
+ stream.on('data', transport.receiver(function(transport_with_data) {
+ var input = new protocol(transport_with_data);
+ var output = new protocol(new transport(undefined, function(buf) {
+ stream.write(buf);
}));
- processor.process(input, output);
+ try {
+ processor.process(input, output);
+ transport_with_data.commitPosition();
+ }
+ catch (e) {
+ if (e instanceof ttransport.InputBufferUnderrunError) {
+ transport_with_data.rollbackPosition();
+ }
+ else {
+ throw e;
+ }
+ }
}));
stream.on('end', function() {
stream.end();
});
});
-}
+};
View
175 lib/thrift/transport.js
@@ -1,20 +1,74 @@
+var BinaryParser = require('./binary_parser').BinaryParser;
+
var emptyBuf = new Buffer(0);
-var TMemoryBuffer = exports.TMemoryBuffer = function(buffer, callback) {
+var InputBufferUnderrunError = exports.InputBufferUnderrunError = function() {
+};
+
+var TFramedTransport = exports.TFramedTransport = function(buffer, callback) {
this.inBuf = buffer || emptyBuf;
this.outBuffers = [];
this.outCount = 0;
this.readPos = 0;
this.onFlush = callback;
};
+TFramedTransport.receiver = function(callback) {
+ var frameLeft = 0,
+ framePos = 0,
+ frame = null;
+ var residual = null;
+
+ return function(data) {
+ // Prepend any residual data from our previous read
+ if (residual) {
+ var dat = new Buffer(data.length + residual.length);
+ residual.copy(dat, 0, 0);
+ data.copy(dat, residual.length, 0);
+ residual = null;
+ }
+
+ // framed transport
+ while (data.length) {
+ if (frameLeft === 0) {
+ // TODO assumes we have all 4 bytes
+ if (data.length < 4) {
+ console.log("Expecting > 4 bytes, found only " + data.length);
+ residual = data;
+ break;
+ //throw Error("Expecting > 4 bytes, found only " + data.length);
+ }
+ frameLeft = BinaryParser.toInt(data.slice(0,4));
+ frame = new Buffer(frameLeft);
+ framePos = 0;
+ data = data.slice(4, data.length);
+ }
+
+ if (data.length >= frameLeft) {
+ data.copy(frame, framePos, 0, frameLeft);
+ data = data.slice(frameLeft, data.length);
+
+ frameLeft = 0;
+ callback(new TFramedTransport(frame));
+ } else if (data.length) {
+ data.copy(frame, framePos, 0, data.length);
+ frameLeft -= data.length;
+ framePos += data.length;
+ data = data.slice(data.length, data.length);
+ }
+ }
+ };
+};
+
+TFramedTransport.prototype = {
+ commitPosition: function(){},
+ rollbackPosition: function(){},
-TMemoryBuffer.prototype = {
// TODO: Implement open/close support
isOpen: function() {return true;},
open: function() {},
close: function() {},
- read: function(len) {
+ read: function(len) { // this function will be used for each frames.
var end = this.readPos + len;
if (this.inBuf.length < end) {
@@ -41,14 +95,125 @@ TMemoryBuffer.prototype = {
},
flush: function() {
- var out = new Buffer(this.outCount), pos = 0;
+ var out = new Buffer(this.outCount),
+ pos = 0;
this.outBuffers.forEach(function(buf) {
buf.copy(out, pos, 0);
pos += buf.length;
});
+
+ if (this.onFlush) {
+ // TODO: optimize this better, allocate one buffer instead of both:
+ var msg = new Buffer(out.length + 4);
+ BinaryParser.fromInt(out.length).copy(msg, 0, 0, 4);
+ out.copy(msg, 4, 0, out.length);
+ this.onFlush(msg);
+ }
+
+ this.outBuffers = [];
+ this.outCount = 0;
+ }
+};
+
+var TBufferedTransport = exports.TBufferedTransport = function(buffer, callback) {
+ this.defaultReadBufferSize = 1024;
+ this.writeBufferSize = 512; // Soft Limit
+ this.inBuf = new Buffer(this.defaultReadBufferSize);
+ this.readCursor = 0;
+ this.writeCursor = 0; // for input buffer
+ this.outBuffers = [];
+ this.outCount = 0;
+ this.onFlush = callback;
+};
+TBufferedTransport.receiver = function(callback) {
+ var reader = new TBufferedTransport();
+
+ return function(data) {
+ if (reader.writeCursor + data.length > reader.inBuf.length) {
+ var buf = new Buffer(reader.writeCursor + data.length);
+ reader.inBuf.copy(buf, 0, 0, reader.writeCursor);
+ reader.inBuf = buf;
+ }
+ data.copy(reader.inBuf, reader.writeCursor, 0);
+ reader.writeCursor += data.length;
+
+ callback(reader);
+ };
+};
+
+TBufferedTransport.prototype = {
+ commitPosition: function(){
+ var unreadedSize = this.writeCursor - this.readCursor;
+ var bufSize = (unreadedSize * 2 > this.defaultReadBufferSize) ? unreadedSize * 2 : this.defaultReadBufferSize;
+ var buf = new Buffer(bufSize);
+ if (unreadedSize > 0) {
+ this.inBuf.copy(buf, 0, this.readCursor, unreadedSize);
+ }
+ this.readCursor = 0;
+ this.writeCursor = unreadedSize;
+ this.inBuf = buf;
+ },
+ rollbackPosition: function(){
+ this.readCursor = 0;
+ },
+
+ // TODO: Implement open/close support
+ isOpen: function() {return true;},
+ open: function() {},
+ close: function() {},
+
+ read: function(len) {
+ if (this.readCursor + len > this.writeCursor) {
+ throw new InputBufferUnderrunError();
+ }
+ var buf = new Buffer(len);
+ this.inBuf.copy(buf, 0, this.readCursor, this.readCursor + len);
+ this.readCursor += len;
+ return buf;
+ },
+
+ readAll: function() {
+ if (this.readCursor >= this.writeCursor) {
+ throw new InputBufferUnderrunError();
+ }
+ var buf = new Buffer(this.writeCursor - this.readCursor);
+ this.inBuf.copy(buf, 0, this.readCursor, this.writeCursor - this.readCursor);
+ this.readCursor = this.writeCursor;
+ return buf;
+ },
+
+ write: function(buf, encoding) {
+ if (typeof(buf) === "string") {
+ // Defaulting to ascii encoding here since that's more like the original
+ // code, but I feel like 'utf8' would be a better choice.
+ buf = new Buffer(buf, encoding || 'ascii');
+ }
+ if (this.outCount + buf.length > this.writeBufferSize) {
+ this.flush();
+ }
+
+ this.outBuffers.push(buf);
+ this.outCount += buf.length;
+ if (this.outCount >= this.writeBufferSize) {
+ this.flush();
+ }
+ },
+
+ flush: function() {
+ if (this.outCount < 1) {
+ return;
+ }
+
+ var msg = new Buffer(this.outCount),
+ pos = 0;
+ this.outBuffers.forEach(function(buf) {
+ buf.copy(msg, pos, 0);
+ pos += buf.length;
+ });
+
if (this.onFlush) {
- this.onFlush(out);
+ this.onFlush(msg);
}
this.outBuffers = [];

0 comments on commit ae8be1f

Please sign in to comment.
Something went wrong with that request. Please try again.