Permalink
Browse files

re-working Buffers and Streams.

  • Loading branch information...
1 parent 95e79c8 commit 371ba95f2ef77386b4ec8b47612db9d7276946af @seebees committed Sep 6, 2011
Showing with 683 additions and 254 deletions.
  1. +20 −18 lib/draino.js
  2. +339 −152 lib/streambuffer.js
  3. +324 −84 test/testStreamBuffer.js
View
@@ -5,26 +5,27 @@ var util = require('util'),
inherit = libs.inherit,
mixin = libs.mixin,
StreamBuffer = require('./streambuffer.js').StreamBuffer,
+ BufferedStream = require('./streambuffer.js').BufferedStream,
snake = require('./snake.js').snake,
actions = {
flush : function (source, options) {
var dest = this;
-
+
//we need a buffer to hold the outstanding streams
if (!Array.isArray(dest._buffer)) {
dest._buffer = [];
}
//Stream::pipe already gives us a var for this, why make my own?
dest._pipeCount = dest._pipeCount || 0;
dest._pipeCount += 1;
-
+
options = options || {};
-
+
//helper function to pull the next source or end the dest
function nextOrEnd() {
var next = dest._buffer.shift();
dest._pipeCount -= 1;
-
+
if (typeof next === 'function') {
next(true);
} else if (dest._pipeCount <= 0) {
@@ -33,11 +34,11 @@ var util = require('util'),
nextOrEnd();
}
}
-
+
//flush should result in ONLY one data event to the dest stream
//so all source streams are buffered.
var buff = new StreamBuffer(dest, source);
-
+
//helper function to do the work for this source
//this way flush and funnel can share the same buffer
function next (fromEndOrNext) {
@@ -62,10 +63,10 @@ var util = require('util'),
buff.on('full', next);
}
}
-
+
//push onto the stack to process
dest._buffer.push(next);
-
+
if (!options.serial) {
//once it's full move it to the top of the stack
buff.on('full', function () {
@@ -77,42 +78,42 @@ var util = require('util'),
}
});
}
-
+
if (dest._pipeCount === 1) {
//this is the first, just start it
buff.on('full', nextOrEnd);
}
-
+
//let's do that again!
return dest;
},
funnel : function (source, options) {
var dest = this;
-
+
//we need a buffer to hold the outstanding streams
if (!Array.isArray(dest._buffer)) {
dest._buffer = [];
}
//Stream::pipe already gives us a var for this, why make my own?
dest._pipeCount = dest._pipeCount || 0;
dest._pipeCount += 1;
-
+
options = options || {};
//if you really don't want to buffer something you need to tell me.
options.buffer = (options.buffer === false ? false : true);
-
+
//helper function to pull the next source or end the dest
function nextOrEnd () {
var next = dest._buffer.shift();
dest._pipeCount -= 1;
-
+
if (typeof next === 'function') {
next();
} else if (dest._pipeCount <= 0) {
dest.end();
}
}
-
+
//helper function to do the work for this source
//this way flush and funnel can share the same buffer
function next () {
@@ -141,7 +142,7 @@ var util = require('util'),
nextOrEnd();
}
}
-
+
if (dest._pipeCount === 1) {
//this is the first, just start it
//(using once so I don't have dangling closures)
@@ -170,7 +171,7 @@ var util = require('util'),
}
//push onto the stack to process
dest._buffer.push(next);
-
+
//let's do that again!
return dest;
}
@@ -179,14 +180,15 @@ var util = require('util'),
Stream,
actions
);
-
+
SerialPump.prototype.end = function () {
this.emit('end');
};
mixin(exports, {
SerialPump : SerialPump,
StreamBuffer : StreamBuffer,
+ BufferedStream : BufferedStream,
flush : function (source, dest, options) {
return actions.flush.call(dest, source, options);
},
Oops, something went wrong.

0 comments on commit 371ba95

Please sign in to comment.