Browse files

Merge pull request #34 from visionmedia/add/req-send-callback

add req.send() callback
  • Loading branch information...
2 parents cf343c3 + 601c2ae commit 1355a66812c327d7adbe3aab04e29fbb4a03f884 @tj committed Sep 27, 2012
View
153 Readme.md
@@ -18,10 +18,8 @@
- push / pull
- pub / sub
- - emitter
- req / rep
- - router
- - dealer
+ - emitter
## Push / Pull
@@ -94,157 +92,142 @@ sock.on('message', function(msg){
});
```
-## EmitterSocket
-
-`EmitterSocket`'s send and receive messages behaving like regular node `EventEmitter`s.
-This is achieved by using pub / sub sockets behind the scenes and automatically formatting
-messages with the "json" codec. Currently we simply define the `EmitterSocket` as a `PubSocket` if you `.bind()`, and `SubSocket` if you `.connect()`, providing the natural API you're used to:
+## Req / Rep
-server.js:
+`ReqSocket` is similar to a `PushSocket` in that it round-robins messages
+to connected `RepSocket`s, however it differs in that this communication is
+bi-directional, every `req.send()` _must_ provide a callback which is invoked
+when the `RepSocket` replies.
```js
var axon = require('axon')
- , sock = axon.socket('emitter');
+ , sock = axon.socket('req');
-sock.bind(3000);
-console.log('pub server started');
+req.bind(3000);
-setInterval(function(){
- sock.emit('login', { name: 'tobi' });
-}, 500);
+req.send(img, function(res){
+
+});
```
-client.js:
+`RepSocket`s receive a `reply` callback that is used to respond to the request,
+you may have several of these nodes.
```js
var axon = require('axon')
- , sock = axon.socket('emitter');
+ , sock = axon.socket('rep');
sock.connect(3000);
-console.log('sub client connected');
-sock.on('login', function(user){
- console.log('%s signed in', user.name);
+sock.on('message', function(img, reply){
+ // resize the image
+ reply(img);
});
```
-## Req / Rep
-
-`ReqSocket`s send and receive messages, queueing messages on remote disconnects. There
-is no "lock step" involved, allowing messages sent later to receive replies prior to
-previously sent messages. `RepSocket`s reply to received messages, there is no concept of `send()`. Each
-received message will have a `reply` callback, which will send the response back to the remote peer:
+ Like other sockets you may provide multiple arguments or an array of arguments,
+ followed by the callbacks. For example here we provide a task name of "resize"
+ to facilitate multiple tasks over a single socket:
-client.js
```js
var axon = require('axon')
, sock = axon.socket('req');
-sock.connect(3000);
+req.bind(3000);
-sock.on('message', function(msg){
- console.log('got: %s', msg.toString());
+req.send('resize', img, function(res){
+
});
-
-setInterval(function(){
- sock.send('ping');
-}, 150);
```
-server.js
-```js
+`RepSocket`s receive a `reply` callback that is used to respond to the request,
+you may have several of these nodes.
+```js
var axon = require('axon')
, sock = axon.socket('rep');
-sock.bind(3000);
+sock.connect(3000);
-sock.on('message', function(msg, reply){
- console.log('got: %s', msg.toString());
- reply('pong');
+sock.on('message', function(task, img, reply){
+ switch (task.toString()) {
+ case 'resize':
+ // resize the image
+ reply(img);
+ break;
+ }
});
```
-## Router
+## EmitterSocket
-`RouterSocket`s send a message to an "identified" peer using the socket's "identity"
-(see `socket options`). Sent messages are not queued. The message sent leverages
-multipart messages by framing the "identity" first, the delimiter second, and then
-the actual message body.
+`EmitterSocket`'s send and receive messages behaving like regular node `EventEmitter`s.
+This is achieved by using pub / sub sockets behind the scenes and automatically formatting
+messages with the "json" codec. Currently we simply define the `EmitterSocket` as a `PubSocket` if you `.bind()`, and `SubSocket` if you `.connect()`, providing the natural API you're used to:
-__Note:__ This will probably change due to the awkwardness of handling your own delimiters.
+server.js:
-client.js
```js
var axon = require('axon')
- , sock = axon.socket('router');
+ , sock = axon.socket('emitter');
sock.bind(3000);
-
-sock.on('message', function(from, delim, msg){
- console.log(msg.toString());
-});
+console.log('pub server started');
setInterval(function(){
- sock.send('foo', '\u0000', 'hello foo');
- sock.send('bar', '\u0000', 'hello bar');
-}, 150);
+ sock.emit('login', { name: 'tobi' });
+}, 500);
```
-server.js
+client.js:
+
```js
var axon = require('axon')
- , foo = axon.socket('rep')
- , bar = axon.socket('rep');
-
-foo.set('identity', 'foo');
-foo.connect(3000);
-
-foo.on('message', function(msg, reply){
- reply('foo: pong');
-});
+ , sock = axon.socket('emitter');
-bar.set('identity', 'bar');
-bar.connect(3000);
+sock.connect(3000);
+console.log('sub client connected');
-bar.on('message', function(msg, reply){
- reply('bar says: pong');
+sock.on('login', function(user){
+ console.log('%s signed in', user.name);
});
```
-## Dealer
+## Req / Rep
-`DealerSocket`s receive messages and round-robin sent messages. There is no
-correlation between the two. They can be thought of as a `PushSocket` and `PullSocket`
-combined. Here the dealer the serves as an "echo-service", sending whatever is receives:
+`ReqSocket`s send and receive messages, queueing messages on remote disconnects. There
+is no "lock step" involved, allowing messages sent later to receive replies prior to
+previously sent messages. `RepSocket`s reply to received messages, there is no concept of `send()`. Each
+received message will have a `reply` callback, which will send the response back to the remote peer:
-dealer.js
+client.js
```js
var axon = require('axon')
- , sock = axon.socket('dealer');
+ , sock = axon.socket('req');
-sock.set('identity', 'echo-service');
sock.connect(3000);
sock.on('message', function(msg){
- sock.send(msg);
+ console.log('got: %s', msg.toString());
});
+
+setInterval(function(){
+ sock.send('ping');
+}, 150);
```
-client.js
+server.js
```js
+
var axon = require('axon')
- , sock = axon.socket('router');
+ , sock = axon.socket('rep');
sock.bind(3000);
-sock.on('message', function(from, msg){
- console.log('%s said: %s', from.toString(), msg.toString());
+sock.on('message', function(msg, reply){
+ console.log('got: %s', msg.toString());
+ reply('pong');
});
-
-setInterval(function(){
- sock.send('echo-service', 'hey tobi');
-}, 500);
```
## Socket Options
View
4 lib/index.js
@@ -12,8 +12,6 @@ exports.SubSocket = require('./sockets/sub');
exports.PushSocket = require('./sockets/push');
exports.PullSocket = require('./sockets/pull');
exports.EmitterSocket = require('./sockets/emitter');
-exports.RouterSocket = require('./sockets/router');
-exports.DealerSocket = require('./sockets/dealer');
exports.ReqSocket = require('./sockets/req');
exports.RepSocket = require('./sockets/rep');
@@ -28,8 +26,6 @@ exports.types = {
push: exports.PushSocket,
pull: exports.PullSocket,
emitter: exports.EmitterSocket,
- router: exports.RouterSocket,
- dealer: exports.DealerSocket,
req: exports.ReqSocket,
rep: exports.RepSocket
};
View
32 lib/sockets/dealer.js
@@ -1,32 +0,0 @@
-
-/**
- * Module dependencies.
- */
-
-var Socket = require('./sock')
- , queue = require('../plugins/queue')
- , roundrobin = require('../plugins/round-robin');
-
-/**
- * Expose `DealerSocket`.
- */
-
-module.exports = DealerSocket;
-
-/**
- * Initialize a new `DealerSocket`.
- *
- * @api private
- */
-
-function DealerSocket() {
- Socket.call(this);
- this.use(queue());
- this.use(roundrobin({ fallback: this.enqueue }));
-}
-
-/**
- * Inherits from `Socket.prototype`.
- */
-
-DealerSocket.prototype.__proto__ = Socket.prototype;
View
11 lib/sockets/rep.js
@@ -40,19 +40,14 @@ RepSocket.prototype.onmessage = function(sock){
var self = this;
return function (msg, multipart){
if (!multipart) return debug('rep expects multipart');
- var envelopes = [];
-
- for (var i = 0; i < msg.length; ++i) {
- if ('\u0000' === String(msg[i])) {
- envelopes = msg.splice(0, ++i);
- }
- }
+ var id = msg.pop();
self.emit.apply(self, ['message'].concat(msg, reply));
function reply(){
var args = [].slice.call(arguments);
- sock.write(self.pack(envelopes.concat(args)));
+ args.push(id);
+ sock.write(self.pack(args));
}
};
};
View
40 lib/sockets/req.js
@@ -7,7 +7,6 @@ var Socket = require('./sock')
, queue = require('../plugins/queue')
, debug = require('debug')('axon:req');
-
/**
* Expose `ReqSocket`.
*/
@@ -22,6 +21,10 @@ module.exports = ReqSocket;
function ReqSocket() {
Socket.call(this);
+ this.n = 0;
+ this.ids = 0;
+ this.callbacks = {};
+ this.identity = this.get('identity');
this.use(queue());
}
@@ -32,6 +35,17 @@ function ReqSocket() {
ReqSocket.prototype.__proto__ = Socket.prototype;
/**
+ * Return a message id.
+ *
+ * @return {String}
+ * @api private
+ */
+
+ReqSocket.prototype.id = function(){
+ return this.identity + ':' + this.ids++;
+};
+
+/**
* Emits the "message" event with all message parts
* after the null delimeter part.
*
@@ -44,8 +58,11 @@ ReqSocket.prototype.onmessage = function(){
var self = this;
return function(msg, multipart){
if (!multipart) return debug('expected multipart');
- if ('\u0000' != String(msg[0])) return debug('malformed message');
- self.emit.apply(self, ['message'].concat(msg.slice(1)));
+ var id = msg.pop();
+ var fn = self.callbacks[id];
+ if (!fn) return debug('missing callback %s', id);
+ fn.apply(null, msg);
+ delete self.callbacks[id];
};
};
@@ -58,7 +75,9 @@ ReqSocket.prototype.onmessage = function(){
*/
ReqSocket.prototype.send = function(msg){
- var sock = this.socks[0]
+ var socks = this.socks
+ , len = socks.length
+ , sock = socks[this.n++ % len]
, args = [];
if (Array.isArray(msg)) {
@@ -70,9 +89,18 @@ ReqSocket.prototype.send = function(msg){
}
if (sock) {
- sock.write(this.pack(['\u0000'].concat(args)));
+ if ('function' == typeof args[args.length - 1]) {
+ var fn = args.pop();
+ fn.id = this.id();
+ this.callbacks[fn.id] = fn;
+ args.push(fn.id);
+ }
+ }
+
+ if (sock) {
+ sock.write(this.pack(args));
} else {
debug('no connected peers');
- this.enqueue(msg);
+ this.enqueue(args);
}
};
View
72 lib/sockets/router.js
@@ -1,72 +0,0 @@
-
-/**
- * Module dependencies.
- */
-
-var Socket = require('./sock')
- , debug = require('debug')('axon:router');
-
-/**
- * Expose `RouterSocket`.
- */
-
-module.exports = RouterSocket;
-
-/**
- * Initialize a new `RouterSocket`.
- *
- * @api private
- */
-
-function RouterSocket() {
- Socket.call(this);
-}
-
-/**
- * Inherits from `Socket.prototype`.
- */
-
-RouterSocket.prototype.__proto__ = Socket.prototype;
-
-/**
- * Sent `msg` to a connected peer with
- * the identity `id`.
- *
- * @param {String} id
- * @param {Mixed} msg
- */
-
-RouterSocket.prototype.send = function(id, msg) {
- var args = [].slice.call(arguments);
-
- if (args.length < 2) throw new Error('identity is required');
- if ('string' != typeof args[0]) throw new Error('invalid identity');
-
- var sock = this.map[id];
-
- if (sock) {
- sock.write(this.pack(args.slice(1)));
- } else {
- debug('no peer "%s"', id);
- }
-};
-
-/**
- * Emits the "message" event with all message parts
- * after the null delimeter part. Uses the "identity"
- * from the remote peer as the "envelope" part.
- *
- * @param {net.Socket} sock
- * @return {Function} closure(msg, multipart)
- * @api private
- */
-
-RouterSocket.prototype.onmessage = function(socket){
- var self = this;
- return function(msg, multipart){
- var id = socket._axon_id;
- self.emit.apply(self, ['message', id].concat(msg));
- };
-};
-
-
View
2 lib/sockets/sock.js
@@ -40,7 +40,7 @@ function Socket() {
this.map = {};
this.settings = {};
this.format('none');
- this.set('identity', '\u0000');
+ this.set('identity', [process.pid, Date.now(), Math.random()].join('-'));
this.set('retry timeout', 100);
this.set('retry max timeout', 2000);
}
View
34 test/test.req.router.js
@@ -1,34 +0,0 @@
-
-// based on http://zguide.zeromq.org/page:all#header-49
-
-var ss = require('../')
- , should = require('should');
-
-var router = ss.socket('router')
- , worker = ss.socket('req');
-
-router.bind(3000);
-
-worker.set('identity', 'worker-a');
-worker.connect(3000);
-
-// workload
-worker.on('message', function(msg){
- msg.toString().should.equal('this is the workload');
- router.close();
- worker.close();
-});
-
-// send the workload back after "ready"
-router.on('message', function(addr, delimeter, msg){
- addr.toString().should.equal('worker-a');
- delimeter.toString().should.equal('\u0000');
- msg.toString().should.equal('ready');
- router.send(addr, delimeter, "this is the workload");
-});
-
-// worker tells router were ready
-worker.send('ready');
-
-
-
View
32 test/test.reqrep.chain.js
@@ -0,0 +1,32 @@
+
+var axon = require('..')
+ , should = require('should');
+
+var req = axon.socket('req')
+ , rep = axon.socket('rep')
+ , req2 = axon.socket('req')
+ , rep2 = axon.socket('rep');
+
+req.bind(3000);
+rep.connect(3000);
+
+req2.bind(3001);
+rep2.connect(3001);
+
+rep.on('message', function(msg, reply){
+ req2.send(msg, function(msg){
+ reply('got "' + msg + '"');
+ });
+});
+
+req.send('hello', function(msg){
+ msg.toString().should.equal('got "HELLO"');
+ req.close();
+ rep.close();
+ req2.close();
+ rep2.close();
+});
+
+rep2.on('message', function(msg, reply){
+ reply(msg.toString().toUpperCase());
+});
View
27 test/test.reqrep.js
@@ -1,22 +1,19 @@
-var ss = require('../')
+var axon = require('..')
, should = require('should');
-var server = ss.socket('rep')
- , client = ss.socket('req');
+var req = axon.socket('req')
+ , rep = axon.socket('rep');
-server.bind(3000);
-client.connect(3000);
+req.bind(3000);
+rep.connect(3000);
-server.on('message', function(msg, reply){
- msg.toString().should.equal('hey there tobi');
- reply('tobi says thanks');
+rep.on('message', function(msg, reply){
+ reply('got "' + msg + '"');
});
-client.on('message', function(msg){
- msg.toString().should.equal('tobi says thanks');
- client.close();
- server.close();
-});
-
-client.send('hey there tobi');
+req.send('hello', function(msg){
+ msg.toString().should.equal('got "hello"');
+ req.close();
+ rep.close();
+});
View
38 test/test.reqrep.json.js
@@ -1,31 +1,21 @@
-var ss = require('../')
- , should = require('should');
+var axon = require('..')
+ , should = require('should')
+ , req = axon.socket('req')
+ , rep = axon.socket('rep');
-var server = ss.socket('rep')
- , client = ss.socket('req');
+req.format('json');
+rep.format('json');
-server.format('json');
-client.format('json');
+req.bind(3000);
+rep.connect(3000);
-server.bind(3000);
-client.connect(3000);
-
-server.on('message', function(msg, reply){
- msg.should.have.property('cmd', 'hello');
- reply({
- error: null,
- result: 'thanks'
- });
-});
-
-client.on('message', function(msg){
- msg.should.have.property('error', null);
- msg.should.have.property('result', 'thanks');
- client.close();
- server.close();
+rep.on('message', function(obj, reply){
+ reply({ name: obj.name });
});
-client.send({
- cmd: 'hello'
+req.send({ name: 'Tobi' }, function(res){
+ res.should.eql({ name: 'Tobi' });
+ req.close();
+ rep.close();
});
View
38 test/test.reqrep.ordering.js
@@ -0,0 +1,38 @@
+
+var axon = require('..')
+ , should = require('should');
+
+var req = axon.socket('req')
+ , rep = axon.socket('rep');
+
+req.bind(3000);
+rep.connect(3000);
+
+var pending = 10
+ , n = pending
+ , msgs = [];
+
+rep.on('message', function(msg, reply){
+ reply('got "' + msg + '"');
+});
+
+while (n--) {
+ (function(n){
+ n = String(n);
+ setTimeout(function(){
+ req.send(n, function(msg){
+ msgs.push(msg.toString());
+ --pending || done();
+ });
+ }, Math.random() * 200 | 0);
+ })(n);
+}
+
+function done() {
+ msgs.should.have.length(10);
+ for (var i = 0; i < 10; ++i) {
+ msgs.should.include('got "' + i + '"');
+ }
+ req.close();
+ rep.close();
+}
View
19 test/test.reqrep.vargs.js
@@ -0,0 +1,19 @@
+
+var axon = require('..')
+ , should = require('should');
+
+var req = axon.socket('req')
+ , rep = axon.socket('rep');
+
+req.bind(3000);
+rep.connect(3000);
+
+rep.on('message', function(first, last, reply){
+ reply(first + ' ' + last)
+});
+
+req.send('tobi', 'ferret', function(msg){
+ msg.toString().should.equal('tobi ferret');
+ req.close();
+ rep.close();
+});
View
54 test/test.router.dealer.js
@@ -1,54 +0,0 @@
-
-// http://zguide.zeromq.org/page:all#header-48
-
-var ss = require('../')
- , should = require('should');
-
-var router = ss.socket('router')
- , workerA = ss.socket('dealer')
- , workerB = ss.socket('dealer');
-
-router.bind(3000);
-
-workerA.set('identity', 'worker-a');
-workerA.connect(3000);
-
-workerB.set('identity', 'worker-b');
-workerB.connect(3000);
-
-var n = 0;
-
-router.on('message', function(from, body){
- from = from.toString();
- body = body.toString();
-
- n += 1;
-
- if ('worker-a' == from) {
- body.should.equal('worker-a says hey');
- } else {
- body.should.equal('worker-b says hey');
- }
-
- if (2 === n) {
- workerA.close();
- workerB.close();
- router.close();
- }
-});
-
-workerA.on('message', function(msg){
- msg.toString().should.be.equal('hello worker-a');
- workerA.send('worker-a says hey');
-});
-
-workerB.on('message', function(msg){
- msg.toString().should.be.equal('hello worker-b');
- workerB.send('worker-b says hey');
-});
-
-setTimeout(function(){
- router.send('worker-a', 'hello worker-a');
- router.send('worker-b', 'hello worker-b');
-}, 50);
-
View
35 test/test.router.rep.js
@@ -1,35 +0,0 @@
-
-// based on http://zguide.zeromq.org/page:all#header-50
-
-var ss = require('../')
- , should = require('should');
-
-var router = ss.socket('router')
- , rep = ss.socket('rep');
-
-router.bind(3000);
-
-rep.set('identity', 'worker-a');
-rep.connect(3000);
-
-// workload
-rep.on('message', function(msg, reply){
- msg.toString().should.equal('this is the workload');
- reply('this is the reply');
-});
-
-// send the workload back after "ready"
-router.on('message', function(a, b, c){
- a.toString().should.equal('worker-a');
- b.toString().should.equal('\u0000');
- c.toString().should.equal('this is the reply');
- rep.close();
- router.close();
-});
-
-setTimeout(function(){
- router.send('worker-a', '\u0000', 'this is the workload');
-}, 50);
-
-
-

0 comments on commit 1355a66

Please sign in to comment.