Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Transform class for doing transforms

  • Loading branch information...
commit 4e20357604fa6911c28a5a76f691fd116d1cac0b 1 parent 8c9736d
@isaacs isaacs authored
Showing with 274 additions and 58 deletions.
  1. +10 −46 passthrough.js
  2. +149 −12 test/passthrough.js
  3. +115 −0 transform.js
View
56 passthrough.js
@@ -1,56 +1,20 @@
'use strict';
// a passthrough stream.
-// whatever you .write(), you can then .read() later.
-// this is not very useful on its own, but it's a handy
-// base class for certain sorts of simple filters and
-// transforms.
+// basically just the most minimal sort of Transform stream.
+// Every written chunk gets output as-is.
module.exports = PassThrough;
-var Readable = require('./readable.js');
-var util = require('util');
-
-util.inherits(PassThrough, Readable);
-
-var fromList = require('./from-list.js');
+var Transform = require('./transform.js');
-function PassThrough() {
- Readable.apply(this);
+var util = require('util');
+util.inherits(PassThrough, Transform);
- this.buffer = [];
- this.length = 0;
+function PassThrough(options) {
+ Transform.call(this, options);
}
-// override this:
-PassThrough.prototype.transform = function(c) {
- return c;
-};
-
-PassThrough.prototype.write = function(c) {
- var needEmitReadable = this.length === 0;
-
- c = this.transform(c);
- if (!c || !c.length) return true;
-
- this.buffer.push(c);
- this.length += c.length;
- if (needEmitReadable) this.emit('readable');
- return (this.length === 0);
-};
-
-PassThrough.prototype.end = function(c) {
- this.ended = true;
- if (c && c.length) this.write(c);
- else if (!this.length) this.emit('end');
-};
-
-PassThrough.prototype.read = function(n) {
- if (!n || n >= this.length) n = this.length;
- var ret = fromList(n, this.buffer, this.length);
- this.length = Math.max(this.length - n, 0);
- if (this.length === 0) {
- var ev = this.ended ? 'end' : 'drain';
- process.nextTick(this.emit.bind(this, ev));
- }
- return ret;
+PassThrough.prototype._transform = function(chunk, output, cb) {
+ output(chunk);
+ cb();
};
View
161 test/passthrough.js
@@ -1,4 +1,5 @@
var PassThrough = require('../passthrough.js');
+var Transform = require('../transform.js');
var test = require('tap').test;
test('passthrough', function(t) {
@@ -8,6 +9,7 @@ test('passthrough', function(t) {
pt.write(new Buffer('bark'));
pt.write(new Buffer('bazy'));
pt.write(new Buffer('kuel'));
+ pt.end();
t.equal(pt.read(5).toString(), 'foogb');
t.equal(pt.read(5).toString(), 'arkba');
@@ -16,18 +18,20 @@ test('passthrough', function(t) {
t.end();
});
-test('passthrough with transform', function(t) {
- pt = new PassThrough;
- pt.transform = function(c) {
+test('simple transform', function(t) {
+ var pt = new Transform;
+ pt._transform = function(c, output, cb) {
var ret = new Buffer(c.length);
ret.fill('x');
- return ret;
+ output(ret);
+ cb();
};
pt.write(new Buffer('foog'));
pt.write(new Buffer('bark'));
pt.write(new Buffer('bazy'));
pt.write(new Buffer('kuel'));
+ pt.end();
t.equal(pt.read(5).toString(), 'xxxxx');
t.equal(pt.read(5).toString(), 'xxxxx');
@@ -36,10 +40,126 @@ test('passthrough with transform', function(t) {
t.end();
});
+test('async passthrough', function(t) {
+ var pt = new Transform;
+ pt._transform = function(chunk, output, cb) {
+ setTimeout(function() {
+ output(chunk);
+ cb();
+ }, 10);
+ };
+
+ pt.write(new Buffer('foog'));
+ pt.write(new Buffer('bark'));
+ pt.write(new Buffer('bazy'));
+ pt.write(new Buffer('kuel'));
+ pt.end();
+
+ setTimeout(function() {
+ t.equal(pt.read(5).toString(), 'foogb');
+ t.equal(pt.read(5).toString(), 'arkba');
+ t.equal(pt.read(5).toString(), 'zykue');
+ t.equal(pt.read(5).toString(), 'l');
+ t.end();
+ }, 100);
+});
+
+test('assymetric transform (expand)', function(t) {
+ var pt = new Transform;
+
+ // emit each chunk 2 times.
+ pt._transform = function(chunk, output, cb) {
+ setTimeout(function() {
+ output(chunk);
+ setTimeout(function() {
+ output(chunk);
+ cb();
+ }, 10)
+ }, 10);
+ };
+
+ pt.write(new Buffer('foog'));
+ pt.write(new Buffer('bark'));
+ pt.write(new Buffer('bazy'));
+ pt.write(new Buffer('kuel'));
+ pt.end();
+
+ setTimeout(function() {
+ t.equal(pt.read(5).toString(), 'foogf');
+ t.equal(pt.read(5).toString(), 'oogba');
+ t.equal(pt.read(5).toString(), 'rkbar');
+ t.equal(pt.read(5).toString(), 'kbazy');
+ t.equal(pt.read(5).toString(), 'bazyk');
+ t.equal(pt.read(5).toString(), 'uelku');
+ t.equal(pt.read(5).toString(), 'el');
+ t.end();
+ }, 100);
+});
+
+test('assymetric transform (compress)', function(t) {
+ var pt = new Transform;
+
+ // each output is the first char of 3 consecutive chunks,
+ // or whatever's left.
+ pt.state = '';
+
+ pt._transform = function(chunk, output, cb) {
+ if (!chunk)
+ chunk = '';
+ var s = chunk.toString();
+ setTimeout(function() {
+ this.state += s.charAt(0);
+ if (this.state.length === 3) {
+ console.error('call output!')
+ output(new Buffer(this.state));
+ this.state = '';
+ }
+ cb();
+ }.bind(this), 10);
+ };
+
+ pt._flush = function(output, cb) {
+ // just output whatever we have.
+ setTimeout(function() {
+ output(new Buffer(this.state));
+ this.state = '';
+ cb();
+ }.bind(this), 10);
+ };
+
+ pt._writableState.lowWaterMark = 3;
+
+ pt.write(new Buffer('aaaa'));
+ pt.write(new Buffer('bbbb'));
+ pt.write(new Buffer('cccc'));
+ pt.write(new Buffer('dddd'));
+ pt.write(new Buffer('eeee'));
+ pt.write(new Buffer('aaaa'));
+ pt.write(new Buffer('bbbb'));
+ pt.write(new Buffer('cccc'));
+ pt.write(new Buffer('dddd'));
+ pt.write(new Buffer('eeee'));
+ pt.write(new Buffer('aaaa'));
+ pt.write(new Buffer('bbbb'));
+ pt.write(new Buffer('cccc'));
+ pt.write(new Buffer('dddd'));
+ pt.end();
+
+ // 'abcdeabcdeabcd'
+ setTimeout(function() {
+ t.equal(pt.read(5).toString(), 'abcde');
+ t.equal(pt.read(5).toString(), 'abcde');
+ t.equal(pt.read(5).toString(), 'abcd');
+ t.end();
+ }, 200);
+});
+
+
test('passthrough reordered', function(t) {
- pt = new PassThrough;
+ var pt = new PassThrough;
var emits = 0;
pt.on('readable', function() {
+ console.error('emit readable', emits)
emits++;
});
@@ -47,14 +167,22 @@ test('passthrough reordered', function(t) {
pt.write(new Buffer('bark'));
t.equal(pt.read(5).toString(), 'foogb');
- t.equal(pt.read(5).toString(), 'ark');
t.equal(pt.read(5), null);
+ console.error('need emit 0');
+
pt.write(new Buffer('bazy'));
pt.write(new Buffer('kuel'));
- t.equal(pt.read(5).toString(), 'bazyk');
- t.equal(pt.read(5).toString(), 'uel');
+ t.equal(pt.read(5).toString(), 'arkba');
+ t.equal(pt.read(5).toString(), 'zykue');
+ t.equal(pt.read(5), null);
+
+ console.error('need emit 1');
+
+ pt.end();
+
+ t.equal(pt.read(5).toString(), 'l');
t.equal(pt.read(5), null);
t.equal(emits, 2);
@@ -62,6 +190,7 @@ test('passthrough reordered', function(t) {
});
test('passthrough facaded', function(t) {
+ console.error('passthrough facaded');
var pt = new PassThrough;
var datas = [];
pt.on('data', function(chunk) {
@@ -74,8 +203,16 @@ test('passthrough facaded', function(t) {
});
pt.write(new Buffer('foog'));
- pt.write(new Buffer('bark'));
- pt.write(new Buffer('bazy'));
- pt.write(new Buffer('kuel'));
- pt.end();
+ setTimeout(function() {
+ pt.write(new Buffer('bark'));
+ setTimeout(function() {
+ pt.write(new Buffer('bazy'));
+ setTimeout(function() {
+ pt.write(new Buffer('kuel'));
+ setTimeout(function() {
+ pt.end();
+ }, 10);
+ }, 10);
+ }, 10);
+ }, 10);
});
View
115 transform.js
@@ -0,0 +1,115 @@
+// a transform stream is a readable/writable stream where you do
+// something with the data. Sometimes it's called a "filter",
+// but that's not a great name for it, since that implies a thing where
+// some bits pass through, and others are simply ignored. (That would
+// be a valid example of a transform, of course.)
+//
+// While the output is causally related to the input, it's not a
+// necessarily symmetric or synchronous transformation. For example,
+// a zlib stream might take multiple plain-text writes(), and then
+// emit a single compressed chunk some time in the future.
+
+'use strict';
+
+module.exports = Transform;
+
+var Readable = require('./readable.js');
+var Writable = require('./writable.js');
+
+var util = require('util');
+
+util.inherits(Transform, Readable);
+
+// parasitic inheritance.
+Object.keys(Writable.prototype).forEach(function(method) {
+ if (!Transform.prototype[method])
+ Transform.prototype[method] = Writable.prototype[method];
+});
+
+function Transform(options) {
+ Readable.call(this, options);
+ Writable.call(this, options);
+
+ // bind output so that it can be passed around as a regular function.
+ this._output = this._output.bind(this);
+
+ // when the writable side finishes, then flush out anything remaining.
+ this.once('finish', function() {
+ if ('function' === typeof this._flush)
+ this._flush(this._output, done.bind(this));
+ else
+ done.call(this);
+ });
+}
+
+// This is the part where you do stuff!
+// override this function in implementation classes.
+// 'chunk' is an input chunk.
+//
+// Call `output(newChunk)` to pass along transformed output
+// to the readable side. You may call 'output' zero or more times.
+//
+// Call `cb(err)` when you are done with this chunk. If you pass
+// an error, then that'll put the hurt on the whole operation. If you
+// never call cb(), then you'll never get another chunk.
+Transform.prototype._transform = function(chunk, output, cb) {
+ throw new Error('not implemented');
+};
+
+
+Transform.prototype._write = function(chunk, cb) {
+ this._transform(chunk, this._output, cb);
+};
+
+Transform.prototype._read = function(n, cb) {
+ var ws = this._writableState;
+ var rs = this._readableState;
+
+ // basically a no-op, since the _transform will fill the
+ // _readableState.buffer and emit 'readable' for us, and set ended
+ // Usually, we want to just not call the cb, and set the reading
+ // flag to false, so that another _read will happen next time,
+ // but no state changes.
+ rs.reading = false;
+
+ // however, if the writable side has ended, and its buffer is clear,
+ // then that means that the input has all been consumed, and no more
+ // will ever be provide. treat this as an EOF, and pass back 0 bytes.
+ if ((ws.ended || ws.ending) && ws.length === 0)
+ cb();
+};
+
+Transform.prototype._output = function(chunk) {
+ if (!chunk || !chunk.length)
+ return;
+
+ var state = this._readableState;
+ var len = state.length;
+ state.buffer.push(chunk);
+ state.length += chunk.length;
+ if (state.needReadable) {
+ state.needReadable = false;
+ this.emit('readable');
+ }
+};
+
+function done(er) {
+ if (er)
+ return this.emit('error', er);
+
+ // if there's nothing in the write buffer, then that means
+ // that nothing more will ever be provided
+ var ws = this._writableState;
+ var rs = this._readableState;
+
+ rs.ended = true;
+ // we may have gotten a 'null' read before, and since there is
+ // no more data coming from the writable side, we need to emit
+ // now so that the consumer knows to pick up the tail bits.
+ if (rs.length && rs.needReadable)
+ this.emit('readable');
+ else if (rs.length === 0) {
+ this.emit('end');
+ }
+}
+

0 comments on commit 4e20357

Please sign in to comment.
Something went wrong with that request. Please try again.