Streams2 - base classes, zlib, crypto, fs #4348

merged 72 commits into from Dec 15, 2012


None yet

6 participants


This implements stream2 functionality for everything except net and http.

The tcp and pipe changes are working in the streams2-net branch, but they break http quite a lot, so it's taking some time to finish.

Please review, but I'd like to merge in as --no-ff so that it's easier to revert if necessary.

@fb55 fb55 and 1 other commented on an outdated diff Dec 3, 2012
this.path = path;
- this.fd = null;
- this.readable = true;
- this.paused = false;
- this.flags = 'r';
- this.mode = 438; /*=0666*/
- this.bufferSize = 64 * 1024;
- options = options || {};
+ this.fd = options.hasOwnProperty('fd') ? options.fd : null;
+ this.flags = options.hasOwnProperty('flags') ? options.flags : 'r';
+ this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
fb55 Dec 3, 2012

Wouldn't the in operator be more appropriate in such cases? I can imagine cases where you would want to have inheritance for settings. (This isn't a concern for this pr, but more for node in general.)

indutny Dec 3, 2012

I completely disagree. Keep in mind that you need to have setting stored in prototype object for this check to be false.

fb55 Dec 3, 2012
var s = fs.createReadStream("/foo.txt", { __proto__: defaultOpts, etc: "etc" });

And guess what happens when __proto__ either overwrites hasOwnProperty, or is null.

indutny Dec 3, 2012

Well, this one is certainly true, but lets face the truth. Users that are exploiting such stuff when using core APIs, are already doomed :)

fb55 Dec 3, 2012

I actually like the ability to specify the default options at one place, and overwrite them as needed. In some cases, this can greatly reduce the amount of duplicated code. (Maybe I'm doomed, but at least I have readable programs ^^.)

Without looking at the source code, you'll never guess that it won't work.

indutny Dec 3, 2012

Well, lets do it this way then options.fd !== undefined ? ... : .... I really dislike in keyword.

fb55 Dec 3, 2012

In this case, that's probably the better option, as it allows the cheap removal of settings (without switching the object to dictionary mode by using delete). Although you might want to use typeof options.fd != "undefined".


+1. I really like the new interfaces, although I'm afraid they'll affect speed in a serious manner. Are there any stats?

@indutny indutny commented on an outdated diff Dec 3, 2012
+module.exports = Readable;
+Readable.ReadableState = ReadableState;
+var Stream = require('stream');
+var util = require('util');
+var assert = require('assert');
+var StringDecoder;
+util.inherits(Readable, Stream);
+function ReadableState(options, stream) {
+ options = options || {};
+ // cast to an int
+ this.bufferSize = ~~this.bufferSize;
indutny Dec 3, 2012

Why not this.bufferSize | 0 ?

indutny Dec 3, 2012

Though, it's not really important.

@indutny indutny commented on the diff Dec 3, 2012
+ // the minimum number of bytes to buffer before emitting 'readable'
+ // default to pushing everything out as fast as possible.
+ this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
+ options.lowWaterMark : 0;
+ // cast to ints.
+ assert(typeof this.bufferSize === 'number');
+ assert(typeof this.lowWaterMark === 'number');
+ assert(typeof this.highWaterMark === 'number');
+ this.bufferSize = ~~this.bufferSize;
+ this.lowWaterMark = ~~this.lowWaterMark;
+ this.highWaterMark = ~~this.highWaterMark;
+ assert(this.bufferSize >= 0);
+ assert(this.lowWaterMark >= 0);
+ assert(this.highWaterMark >= this.lowWaterMark,
+ this.highWaterMark + '>=' + this.lowWaterMark);
indutny Dec 3, 2012

I'm starting to imagine a lot of users complaining: "What 1 >= 10 error means?!" :)

May be you should prefix it with informative message?

isaacs Dec 3, 2012

Yes, good idea.

@indutny indutny commented on the diff Dec 3, 2012
+// Override this method or _write(chunk, cb)
+Writable.prototype.write = function(chunk, encoding, cb) {
+ var state = this._writableState;
+ if (typeof encoding === 'function') {
+ cb = encoding;
+ encoding = null;
+ }
+ if (state.ended) {
+ var er = new Error('write after end');
+ if (typeof cb === 'function')
+ cb(er);
+ this.emit('error', er);
indutny Dec 3, 2012

do you think it's a good idea to emit error after calling callback? I have no idea how this API could be useful for anyone...

isaacs Dec 3, 2012

No, I think it's stupid. But that's how fs streams work today, so that's what we get. Changing it would be fine, but it should happen in 0.11.x

indutny Dec 3, 2012

Ok, one step a the time. Makes sense.

@indutny indutny commented on the diff Dec 3, 2012
+ state.pipes.push(dest);
+ break;
+ }
+ state.pipesCount += 1;
+ if ((!pipeOpts || pipeOpts.end !== false) &&
+ dest !== process.stdout &&
+ dest !== process.stderr) {
+ src.once('end', onend);
+ dest.on('unpipe', function(readable) {
+ if (readable === src)
+ src.removeListener('end', onend);
+ });
+ }
+ if (pipeOpts && pipeOpts.chunkSize)
indutny Dec 3, 2012

This might confuse people.... I don't understand why should this option be exposed in APIs? For compatibility only? If so, probably it would better to display some warning on setting it twice..?

isaacs Dec 3, 2012

Actually, we don't need chunkSize there at all. I'm not sure where that idea even came from, but it should probably just be removed.

Node.js Foundation member

Whoa, finally reviewed it... Looks good to me! :)

Node.js Foundation member

And +1 @isaacs for such a big work!


@fb55 I've been using the benchmark/net-pipe.js script against the streams2-net branch and master. There is a very very slight decrease in speed, but it's dwarfed by the overhead of IO. It'll be easier to really dig into that once it's working in http, because we have better tooling to analyze http performance, and that's what everyone cares about.

Once the .once() and Array.forEach and Function.bind calls were removed, it was much better.

isaacs added some commits Oct 5, 2012
@isaacs isaacs Don't allow invalid encodings in StringDecoder class 314c6b3
@isaacs isaacs streams2: The new stream base classes 420e07c
@isaacs isaacs module: Support cycles in native module requires 372cb32
@isaacs isaacs Add 'stream' as a native module in repl 17834ed
@isaacs isaacs streams2: Convert strings to buffers before passing to _write() 639fbe2
@isaacs isaacs streams2: setEncoding and abstract out endReadable 9b5abe5
@isaacs isaacs streams2: Set flowing=true when flowing 51a52c4
@isaacs isaacs streams2: Make Transform streams pull-style
That is, the transform is triggered by a _read, not by a _write.

This way, backpressure works properly.
@isaacs isaacs transform: Automatically read() on _write when read buffer is empty caa853b
@isaacs isaacs streams2: Allow 0 as a lowWaterMark value 02f017d
@isaacs isaacs streams2: Correct drain/return logic
It was testing the length *before* adding the current chunk, which
is the opposite of correct.

Also, the return value was flipped.
@isaacs isaacs streams2: Handle immediate synthetic transforms properly 8acb416
@isaacs isaacs streams2: Tests of new interfaces 9b1b854
@isaacs isaacs streams2: ctor guards on Stream classes 545f512
@isaacs isaacs streams2: Allow Writables to opt out of pre-buffer-izing 0678480
@isaacs isaacs streams2: Support write(chunk,[encoding],[callback]) 71e2b61
@isaacs isaacs test: Writable bufferizing, non-bufferizing, and callbacks f3e71eb
@isaacs isaacs streams2: Fix regression from Duplex ctor assignment e82d06b
@isaacs isaacs test: fixture for streams2 testing acfb0ef
@isaacs isaacs streams2: flow() is not always bound to src cf0b4ba
@isaacs isaacs streams2: Use StringDecoder.end f624ccb
@isaacs isaacs streams2: Abstract out onread function 286aa04
@isaacs isaacs streams2: Fix duplex no-half-open logic 5856823
@isaacs isaacs streams2: Export Readable/Writable State classes 63ac07b
@isaacs isaacs streams2: Add high water mark for Readable
Also, organize the numeric settings a bit on the ReadableState class
@isaacs isaacs streams2: Set Readable lwm to 0 by default 62dd040
@isaacs isaacs streams2: Only emit 'readable' when needed 286c544
@isaacs isaacs streams2: Writable organization, add 'finishing' flag 0118584
@isaacs isaacs test: Update stream2 transform for corrected behavior c2f62d4
@isaacs isaacs streams2: Do multipipe without always using forEach
The Array.forEach call is too expensive.
@isaacs isaacs streams2: Remove function.bind() usage
It's too slow, unfortunately.
@isaacs isaacs streams2: Get rid of .once() usage in Readable.pipe
Significant performance impact
@isaacs isaacs streams2: Set 'readable' flag on Readable streams 53fa66d
@isaacs isaacs streams2: Refactor out .once() usage from Readable.pipe() 4b4ff2d
@isaacs isaacs streams2: Handle pipeChunkSize properly ac5a185
@isaacs isaacs streams2: Remove pipe if the dest emits error 49ea653
@isaacs isaacs streams2: Unpipe on dest.emit('close') d58f265
@isaacs isaacs zlib: streams2 0e01d63
@isaacs isaacs test: Fix test-repl-autolibs inspect call 79fd962
@isaacs isaacs fs: streams2 44b308b
@isaacs isaacs test: simple/test-file-write-stream needs to use 0 lowWaterMark 70461c3
@isaacs isaacs test: Writable stream end() method doesn't take a callback 3d3a0b3
@isaacs isaacs crypto: Streaming interface for Hash 90de2dd
@isaacs isaacs crypto: Streaming api for Hmac 175f78c
@isaacs isaacs crypto: Streaming interface for cipher/decipher/iv e336134
@isaacs isaacs crypto: Streaming interface for Sign and Verify dd3ebb8
@isaacs isaacs test: Tests for streaming crypto interfaces e0c600e
@isaacs isaacs doc: Crypto streaming interface 4a32d53
@isaacs isaacs streams2: Set readable=false on end 83704f1
@isaacs isaacs streams2: Switch to old-mode immediately, not nextTick
This fixes the CONNECT/Upgrade HTTP functionality, which was not getting
sliced properly, because readable wasn't emitted on this tick.


@isaacs isaacs streams2: pause() should be immediate 99021b7
@isaacs isaacs streams2: NextTick the emit('readable') in resume()
Otherwise resume() will cause data to be emitted before it can be handled.
@isaacs isaacs test: Sync writables may emit finish before callbacks f8bb031
@isaacs isaacs lint fc7d8d5
@isaacs isaacs streams2: Call read(0) on resume()
Otherwise (especially with stdin) you sometimes end up in cases
where the high water mark is zero, and the current buffer is at 0,
and it doesn't need a readable event, so it never calls _read().
@isaacs isaacs streams2: Writable only emit 'finish' once 5760244
@isaacs isaacs benchmark: Add once() function to net-pipe benchmark fixture 7742257
@isaacs isaacs streams2: Support a Readable hwm of 0
Necessary for proper stdin functioning
@isaacs isaacs streams2: Emit pause/resume events 04541cf
@isaacs isaacs streams2: Remove extraneous bufferSize setting 854171d
@isaacs isaacs docs: streams2 20a88fe
@isaacs isaacs child_process: Remove stream.pause/resume calls
Unnecessary in streams2
@isaacs isaacs net: Refactor to use streams2
This is a combination of 6 commits.

* XXX net fixup lcase stream

* net: Refactor to use streams2

    Use 'socket.resume()' in many tests to trigger old-mode behavior.

* net: Call destroy() if shutdown() is not provided

    This is important for TTY wrap streams

* net: Call .end() in socket.destroySoon if necessary

    This makes the http 1.0 keepAlive test pass, also.

* net wtf-ish stuff kinda busted

* net fixup
@isaacs isaacs tty/stdin: Refactor for streams2 bb56dcc
@isaacs isaacs test updates b4df1e6
@isaacs isaacs test: Fix many tests for streams2 net refactor 695abba
@isaacs isaacs http: Refactor for streams2
Because of some of the peculiarities of http, this has a bit of special
magic to handle cases where the IncomingMessage would wait forever in a
paused state.

In the server, if you do not begin consuming the request body by the
time the response emits 'finish', then it will be flushed out.

In the client, if you do not add a 'response' handler onto the request,
then the response stream will be flushed out.
@isaacs isaacs test updates for streams2 19ecc3a
@isaacs isaacs test: Fix many tests for http streams2 refactor 0977638
@isaacs isaacs streams2: Still emit error if there was a write() cb 3751c0f
@isaacs isaacs test: Update simple/test-fs-{write,read}-stream-err for streams2
Streams2 style streams might have already kicked off a read() or write()
before emitting 'data' events.  Make the test less dependent on ordering
of when data events occur.
@isaacs isaacs test: Update message tests for streams2 cd51fa8
@isaacs isaacs merged commit cd51fa8 into nodejs:master Dec 15, 2012

What if we want d._writableState.objectMode = true and d._readableState.objectMode = false

Or what if we want different watermarks for either side.

I would also like a way to set distinct options for the Readable and Writable sides of the stream.


TypeError: Object [object Object] has no method 'end'
at Object.<anonymous> (/tmp/node/test/simple/test-crypto.js:290:9)
at Module._compile (module.js:449:26)
at Object.Module._extensions..js (module.js:467:10)
at Module.load (module.js:356:32)
at Function.Module._load (module.js:312:12)
at Module.runMain (module.js:492:10)
at process.startup.processNextTick.process._tickCallback (node.js:325:13)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment