From ef5fa1ccc1124c889aab7b3527bef95b9c6c5a06 Mon Sep 17 00:00:00 2001 From: Bent Cardan Date: Mon, 5 Jan 2015 21:27:42 -0500 Subject: [PATCH] add stream socket type --- binding.cc | 5 +++- lib/index.js | 1 + test/socket.stream.js | 65 +++++++++++++++++++++++++++++++++++++++++++ windows/include/zmq.h | 1 + 4 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 test/socket.stream.js diff --git a/binding.cc b/binding.cc index 63df259..981f296 100644 --- a/binding.cc +++ b/binding.cc @@ -1227,7 +1227,10 @@ namespace zmq { NODE_DEFINE_CONSTANT(target, ZMQ_PUSH); NODE_DEFINE_CONSTANT(target, ZMQ_PULL); NODE_DEFINE_CONSTANT(target, ZMQ_PAIR); - + #if ZMQ_VERSION_MAJOR >= 4 + NODE_DEFINE_CONSTANT(target, ZMQ_STREAM); + #endif + NODE_DEFINE_CONSTANT(target, ZMQ_POLLIN); NODE_DEFINE_CONSTANT(target, ZMQ_POLLOUT); NODE_DEFINE_CONSTANT(target, ZMQ_POLLERR); diff --git a/lib/index.js b/lib/index.js index 1cfeae6..11fc17e 100644 --- a/lib/index.js +++ b/lib/index.js @@ -36,6 +36,7 @@ var types = exports.types = { , dealer: zmq.ZMQ_DEALER , router: zmq.ZMQ_ROUTER , pair: zmq.ZMQ_PAIR + , stream: zmq.ZMQ_STREAM }; var longOptions = { diff --git a/test/socket.stream.js b/test/socket.stream.js new file mode 100644 index 0000000..211a091 --- /dev/null +++ b/test/socket.stream.js @@ -0,0 +1,65 @@ +var zmq = require('..') + , http = require('http') + , should = require('should') + , semver = require('semver'); + +describe('socket.stream', function(){ + + //since its for libzmq4+, we target versions > 4.0.0 + var version = semver.lte(zmq.version, '4.0.0'); + + it('should support a streaming socket', function (done){ + + if (!version) { + done(); + return console.warn('stream socket type in libzmq v4+'); + } + + var stream = zmq.socket('stream'); + stream.on('message', function (id,msg){ + + msg.should.be.an.instanceof(Buffer); + + var raw_header = String(msg).split('\r\n'); + var method = raw_header[0].split(' ')[0]; + method.should.equal('GET'); + + //due to HTTP GET method, prepare HTTP response for TCP socket + var httpProtocolString = 'HTTP/1.0 200 OK\r\n' //status code + + 'Content-Type: text/html\r\n' //headers + + '\r\n' + + '' //response body + + '' //make it xml, json, html or something else + + '' + + '' + + '' + +'

derpin over protocols

' + + '' + +'' + + //zmq streaming prefixed by envelope's routing identifier + stream.send([id,httpProtocolString]); + }); + + var addr = '127.0.0.1:47080'; + stream.bind('tcp://'+addr, function(){ + //send non-peer request to zmq, like an http GET method with URI path + http.get('http://'+addr+'/aRandomRequestPath', function (httpMsg){ + + //it's a readable stream as the good lord intended + httpMsg.socket._readableState.reading.should.be.true + + //conventional node streams emit data events to process zmq stream response + httpMsg.on('data',function (msg){ + msg.should.be.an.instanceof(Buffer); + String(msg).should.equal('' + +'' + +'

derpin over protocols

' + +'' + +''); + done(); + }); + }); + }); + }); +}); \ No newline at end of file diff --git a/windows/include/zmq.h b/windows/include/zmq.h index 9678b87..57fb9a1 100644 --- a/windows/include/zmq.h +++ b/windows/include/zmq.h @@ -212,6 +212,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_PUSH 8 #define ZMQ_XPUB 9 #define ZMQ_XSUB 10 +#define ZMQ_STREAM 11 /* Deprecated aliases */ #define ZMQ_XREQ ZMQ_DEALER