Permalink
Browse files

add HWM support. Closes #19

  • Loading branch information...
1 parent 7f75c26 commit 782922172640e48380738b561b461eb053763377 @tj committed Jan 1, 2013
Showing with 120 additions and 25 deletions.
  1. +4 −22 Readme.md
  2. +50 −0 examples/hwm/index.js
  3. +20 −3 lib/plugins/queue.js
  4. +1 −0 lib/sockets/sock.js
  5. +45 −0 test/test.hwm.js
View
@@ -25,6 +25,8 @@
- `connect` when connected to the peer, or a peer connection is accepted
- `disconnect` when an accepted peer disconnects
- `bind` when the server is bound
+ - `drop` (msg) when a message is dropped due to the HWM
+ - `flush` (msgs) queued when messages are flushed on connection
## Patterns
@@ -231,6 +233,7 @@ Every socket has associated options that can be configured via `get/set`.
- `identity` - the "name" of the socket that uniqued identifies it.
- `retry timeout` - connection retry timeout in milliseconds [100]
- `retry max timeout` - the cap for retry timeout length in milliseconds [5000]
+ - `hwm` - the high water mark threshold for queues [Infinity]
## Binding / Connecting
@@ -367,26 +370,5 @@ $ make test
## License
-(The MIT License)
-
-Copyright (c) 2012 TJ Holowaychuk <tj@vision-media.ca>
-
-Permission is hereby granted, free of charge, to any person obtaining
-a copy of this software and associated documentation files (the
-'Software'), to deal in the Software without restriction, including
-without limitation the rights to use, copy, modify, merge, publish,
-distribute, sublicense, and/or sell copies of the Software, and to
-permit persons to whom the Software is furnished to do so, subject to
-the following conditions:
-
-The above copyright notice and this permission notice shall be
-included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
-IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
-TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
-SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ MIT
View
@@ -0,0 +1,50 @@
+
+var axon = require('../..')
+ , push = axon.socket('push')
+ , pull = axon.socket('pull')
+
+// by default the high water mark (HWM)
+// is Infinity, allowing the queue to
+// grow unbounded. Here it is manually
+// set to 20 for demonstration purposes
+
+push.connect(3000);
+push.set('hwm', 20);
+
+// the receiver (pull socket) periodically
+// unbinds to simulate a poor client
+
+setInterval(function(){
+ console.log('unbind');
+ pull.close();
+ setTimeout(function(){
+ console.log('bind');
+ pull.bind(3000);
+ }, 250);
+}, 2000);
+
+// send more messages than
+// the client can handle
+
+var id = 0;
+setInterval(function(){
+ push.send(String(++id));
+}, 10);
+
+// received messasges
+
+pull.on('message', function(msg){
+ console.log('recv %s', msg);
+});
+
+// dropped messages
+
+push.on('drop', function(msg){
+ console.log('drop %s', msg);
+});
+
+// flushed messages
+
+push.on('flush', function(msgs){
+ console.log('flush %d msgs', msgs.length);
+});
@tj

tj Jan 1, 2013

Owner

this example sucks, it's hard to illustrate in a meaningful way without displaying a ton of output to show what's going on

View
@@ -8,11 +8,14 @@ var debug = require('debug')('axon:queue');
/**
* Queue plugin.
*
- * Provides an `enqueue` method to the `sock`. Messages
+ * Provides an `.enqueue()` method to the `sock`. Messages
* passed to `enqueue` will be buffered until the next
* `connect` event is emitted.
*
- * TODO: HWM via `opts`?
+ * Emits:
+ *
+ * - `drop` (msg) when a message is dropped
+ * - `flush` (msgs) when the queue is flushed
*
* @param {Object} options
* @api private
@@ -36,19 +39,33 @@ module.exports = function(options){
sock.on('connect', function(){
var len = buf.length;
debug('flush %d messages', len);
+
for (var i = 0; i < len; ++i) {
this.send(buf[i]);
}
+
+ var prev = buf;
buf = [];
+ sock.emit('flush', prev);
});
/**
* Pushes `msg` into `buf`.
*/
sock.enqueue = function(msg){
+ var hwm = sock.settings.hwm;
+ if (buf.length >= hwm) return drop(msg);
buf.push(msg);
};
+ /**
+ * Drop the given `msg`.
+ */
+
+ function drop(msg) {
+ debug('drop');
+ sock.emit('drop', msg);
+ }
};
-};
+};
View
@@ -53,6 +53,7 @@ function Socket() {
this.socks = [];
this.settings = {};
this.format('none');
+ this.set('hwm', Infinity);
this.set('identity', String(process.pid));
this.set('retry timeout', 100);
this.set('retry max timeout', 5000);
View
@@ -0,0 +1,45 @@
+
+var axon = require('../')
+ , should = require('should');
+
+var push = axon.socket('push')
+ , pull = axon.socket('pull');
+
+push.set('hwm', 5);
+push.connect(3000);
+
+push.send('1');
+push.send('2');
+push.send('3');
+push.send('4');
+push.send('5');
+
+// check that messages are dropped
+
+push.once('drop', function(msg){
+ msg.toString().should.equal('6');
+
+ push.once('drop', function(msg){
+ msg.toString().should.equal('7');
+
+ pull.bind(3000);
+ push.once('flush', function(buf){
+ buf.should.eql(['1', '2', '3', '4', '5']);
+ push.send('8');
+
+ var vals = [];
+ pull.on('message', function(msg){
+ vals.push(msg.toString());
+ if ('8' == msg.toString()) {
+ vals.should.eql(['1', '2', '3', '4', '5', '8']);
+ push.close();
+ pull.close();
+ }
+ });
+ });
+ });
+
+ push.send('7');
+});
+
+push.send('6');

2 comments on commit 7829221

Collaborator

gjohnson replied Jan 1, 2013

Ah, this is awesome.

Owner

tj replied Jan 1, 2013

it's a start, still potentially pretty leaky due to #81 but beats nothing! back pressure with node requires sooooooo much extra work

Please sign in to comment.