From a5cf3feaf14ef3bc1442c17e0abbbdf2aaebb152 Mon Sep 17 00:00:00 2001 From: Mathias Buus Date: Wed, 4 Apr 2018 16:52:19 +0200 Subject: [PATCH] stream: add pipeline and finished PR-URL: https://github.com/nodejs/node/pull/19828 Reviewed-By: Matteo Collina Reviewed-By: James M Snell --- doc/api/errors.md | 6 + doc/api/stream.md | 106 ++++++ lib/internal/errors.js | 1 + lib/internal/streams/end-of-stream.js | 96 +++++ lib/internal/streams/pipeline.js | 95 +++++ lib/stream.js | 5 + node.gyp | 2 + test/parallel/test-stream-finished.js | 123 +++++++ test/parallel/test-stream-pipeline.js | 483 ++++++++++++++++++++++++++ 9 files changed, 917 insertions(+) create mode 100644 lib/internal/streams/end-of-stream.js create mode 100644 lib/internal/streams/pipeline.js create mode 100644 test/parallel/test-stream-finished.js create mode 100644 test/parallel/test-stream-pipeline.js diff --git a/doc/api/errors.md b/doc/api/errors.md index 49834414decffe..00d30193df7d33 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -1431,6 +1431,12 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream. An attempt was made to call [`stream.write()`][] with a `null` chunk. + +### ERR_STREAM_PREMATURE_CLOSE + +An error returned by `stream.finished()` and `stream.pipeline()`, when a stream +or a pipeline ends non gracefully with no explicit error. + ### ERR_STREAM_PUSH_AFTER_EOF diff --git a/doc/api/stream.md b/doc/api/stream.md index 73eb8396de4f9d..705b58a31c5ce9 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -46,6 +46,9 @@ There are four fundamental stream types within Node.js: * [Transform][] - Duplex streams that can modify or transform the data as it is written and read (for example [`zlib.createDeflate()`][]). +Additionally this module includes the utility functions [pipeline][] and +[finished][]. + ### Object Mode All streams created by Node.js APIs operate exclusively on strings and `Buffer` @@ -1287,6 +1290,107 @@ implementors should not override this method, but instead implement [`readable._destroy()`][readable-_destroy]. The default implementation of `_destroy()` for `Transform` also emit `'close'`. +### stream.finished(stream, callback) + + +* `stream` {Stream} A readable and/or writable stream. +* `callback` {Function} A callback function that takes an optional error + argument. + +A function to get notified when a stream is no longer readable, writable +or has experienced an error or a premature close event. + +```js +const { finished } = require('stream'); + +const rs = fs.createReadStream('archive.tar'); + +finished(rs, (err) => { + if (err) { + console.error('Stream failed', err); + } else { + console.log('Stream is done reading'); + } +}); + +rs.resume(); // drain the stream +``` + +Especially useful in error handling scenarios where a stream is destroyed +prematurely (like an aborted HTTP request), and will not emit `'end'` +or `'finish'`. + +The `finished` API is promisify'able as well; + +```js +const finished = util.promisify(stream.finished); + +const rs = fs.createReadStream('archive.tar'); + +async function run() { + await finished(rs); + console.log('Stream is done reading'); +} + +run().catch(console.error); +rs.resume(); // drain the stream +``` + +### stream.pipeline(...streams[, callback]) + + +* `...streams` {Stream} Two or more streams to pipe between. +* `callback` {Function} A callback function that takes an optional error + argument. + +A module method to pipe between streams forwarding errors and properly cleaning +up and provide a callback when the pipeline is complete. + +```js +const { pipeline } = require('stream'); +const fs = require('fs'); +const zlib = require('zlib'); + +// Use the pipeline API to easily pipe a series of streams +// together and get notified when the pipeline is fully done. + +// A pipeline to gzip a potentially huge tar file efficiently: + +pipeline( + fs.createReadStream('archive.tar'), + zlib.createGzip(), + fs.createWriteStream('archive.tar.gz'), + (err) => { + if (err) { + console.error('Pipeline failed', err); + } else { + console.log('Pipeline succeeded'); + } + } +); +``` + +The `pipeline` API is promisify'able as well: + +```js +const pipeline = util.promisify(stream.pipeline); + +async function run() { + await pipeline( + fs.createReadStream('archive.tar'), + zlib.createGzip(), + fs.createWriteStream('archive.tar.gz') + ); + console.log('Pipeline succeeded'); +} + +run().catch(console.error); +``` + ## API for Stream Implementers @@ -2397,6 +2501,8 @@ contain multi-byte characters. [http-incoming-message]: http.html#http_class_http_incomingmessage [zlib]: zlib.html [hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding +[pipeline]: #stream_stream_pipeline_streams_callback +[finished]: #stream_stream_finished_stream_callback [stream-_flush]: #stream_transform_flush_callback [stream-_read]: #stream_readable_read_size_1 [stream-_transform]: #stream_transform_transform_chunk_encoding_callback diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 8505ec39b68642..2bc2c7bce57af5 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -961,6 +961,7 @@ E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error); E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error); E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error); E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError); +E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error); E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error); E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT', 'stream.unshift() after end event', Error); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js new file mode 100644 index 00000000000000..eeb8a61456a730 --- /dev/null +++ b/lib/internal/streams/end-of-stream.js @@ -0,0 +1,96 @@ +// Ported from https://github.com/mafintosh/end-of-stream with +// permission from the author, Mathias Buus (@mafintosh). + +'use strict'; + +const { + ERR_STREAM_PREMATURE_CLOSE +} = require('internal/errors').codes; + +function noop() {} + +function isRequest(stream) { + return stream.setHeader && typeof stream.abort === 'function'; +} + +function once(callback) { + let called = false; + return function(err) { + if (called) return; + called = true; + callback.call(this, err); + }; +} + +function eos(stream, opts, callback) { + if (typeof opts === 'function') return eos(stream, null, opts); + if (!opts) opts = {}; + + callback = once(callback || noop); + + const ws = stream._writableState; + const rs = stream._readableState; + let readable = opts.readable || (opts.readable !== false && stream.readable); + let writable = opts.writable || (opts.writable !== false && stream.writable); + + const onlegacyfinish = () => { + if (!stream.writable) onfinish(); + }; + + const onfinish = () => { + writable = false; + if (!readable) callback.call(stream); + }; + + const onend = () => { + readable = false; + if (!writable) callback.call(stream); + }; + + const onerror = (err) => { + callback.call(stream, err); + }; + + const onclose = () => { + if (readable && !(rs && rs.ended)) { + return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } + if (writable && !(ws && ws.ended)) { + return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } + }; + + const onrequest = () => { + stream.req.on('finish', onfinish); + }; + + if (isRequest(stream)) { + stream.on('complete', onfinish); + stream.on('abort', onclose); + if (stream.req) onrequest(); + else stream.on('request', onrequest); + } else if (writable && !ws) { // legacy streams + stream.on('end', onlegacyfinish); + stream.on('close', onlegacyfinish); + } + + stream.on('end', onend); + stream.on('finish', onfinish); + if (opts.error !== false) stream.on('error', onerror); + stream.on('close', onclose); + + return function() { + stream.removeListener('complete', onfinish); + stream.removeListener('abort', onclose); + stream.removeListener('request', onrequest); + if (stream.req) stream.req.removeListener('finish', onfinish); + stream.removeListener('end', onlegacyfinish); + stream.removeListener('close', onlegacyfinish); + stream.removeListener('finish', onfinish); + stream.removeListener('end', onend); + stream.removeListener('error', onerror); + stream.removeListener('close', onclose); + }; +} + +module.exports = eos; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js new file mode 100644 index 00000000000000..7e87210a774c5f --- /dev/null +++ b/lib/internal/streams/pipeline.js @@ -0,0 +1,95 @@ +// Ported from https://github.com/mafintosh/pump with +// permission from the author, Mathias Buus (@mafintosh). + +'use strict'; + +const eos = require('internal/streams/end-of-stream'); + +const { + ERR_MISSING_ARGS, + ERR_STREAM_DESTROYED +} = require('internal/errors').codes; + +function once(callback) { + let called = false; + return function(err) { + if (called) return; + called = true; + callback(err); + }; +} + +function noop() {} + +function isRequest(stream) { + return stream.setHeader && typeof stream.abort === 'function'; +} + +function destroyer(stream, reading, writing, callback) { + callback = once(callback); + + let closed = false; + stream.on('close', () => { + closed = true; + }); + + eos(stream, { readable: reading, writable: writing }, (err) => { + if (err) return callback(err); + closed = true; + callback(); + }); + + let destroyed = false; + return (err) => { + if (closed) return; + if (destroyed) return; + destroyed = true; + + // request.destroy just do .end - .abort is what we want + if (isRequest(stream)) return stream.abort(); + if (typeof stream.destroy === 'function') return stream.destroy(); + + callback(err || new ERR_STREAM_DESTROYED('pipe')); + }; +} + +function call(fn) { + fn(); +} + +function pipe(from, to) { + return from.pipe(to); +} + +function popCallback(streams) { + if (!streams.length) return noop; + if (typeof streams[streams.length - 1] !== 'function') return noop; + return streams.pop(); +} + +function pipeline(...streams) { + const callback = popCallback(streams); + + if (Array.isArray(streams[0])) streams = streams[0]; + + if (streams.length < 2) { + throw new ERR_MISSING_ARGS('streams'); + } + + let error; + const destroys = streams.map(function(stream, i) { + const reading = i < streams.length - 1; + const writing = i > 0; + return destroyer(stream, reading, writing, function(err) { + if (!error) error = err; + if (err) destroys.forEach(call); + if (reading) return; + destroys.forEach(call); + callback(error); + }); + }); + + return streams.reduce(pipe); +} + +module.exports = pipeline; diff --git a/lib/stream.js b/lib/stream.js index ba056026d8b20b..7c235108c07256 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -22,6 +22,8 @@ 'use strict'; const { Buffer } = require('buffer'); +const pipeline = require('internal/streams/pipeline'); +const eos = require('internal/streams/end-of-stream'); // Note: export Stream before Readable/Writable/Duplex/... // to avoid a cross-reference(require) issues @@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex'); Stream.Transform = require('_stream_transform'); Stream.PassThrough = require('_stream_passthrough'); +Stream.pipeline = pipeline; +Stream.finished = eos; + // Backwards-compat with node 0.4.x Stream.Stream = Stream; diff --git a/node.gyp b/node.gyp index e2e6842c4f38c6..7ca158c25f04c8 100644 --- a/node.gyp +++ b/node.gyp @@ -154,6 +154,8 @@ 'lib/internal/streams/legacy.js', 'lib/internal/streams/destroy.js', 'lib/internal/streams/state.js', + 'lib/internal/streams/pipeline.js', + 'lib/internal/streams/end-of-stream.js', 'lib/internal/wrap_js_stream.js', 'deps/v8/tools/splaytree.js', 'deps/v8/tools/codemap.js', diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js new file mode 100644 index 00000000000000..2b0c156eb06845 --- /dev/null +++ b/test/parallel/test-stream-finished.js @@ -0,0 +1,123 @@ +'use strict'; + +const common = require('../common'); +const { Writable, Readable, Transform, finished } = require('stream'); +const assert = require('assert'); +const fs = require('fs'); +const { promisify } = require('util'); + +common.crashOnUnhandledRejection(); + +{ + const rs = new Readable({ + read() {} + }); + + finished(rs, common.mustCall((err) => { + assert(!err, 'no error'); + })); + + rs.push(null); + rs.resume(); +} + +{ + const ws = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + finished(ws, common.mustCall((err) => { + assert(!err, 'no error'); + })); + + ws.end(); +} + +{ + const tr = new Transform({ + transform(data, enc, cb) { + cb(); + } + }); + + let finish = false; + let ended = false; + + tr.on('end', () => { + ended = true; + }); + + tr.on('finish', () => { + finish = true; + }); + + finished(tr, common.mustCall((err) => { + assert(!err, 'no error'); + assert(finish); + assert(ended); + })); + + tr.end(); + tr.resume(); +} + +{ + const rs = fs.createReadStream(__filename); + + rs.resume(); + finished(rs, common.mustCall()); +} + +{ + const finishedPromise = promisify(finished); + + async function run() { + const rs = fs.createReadStream(__filename); + const done = common.mustCall(); + + let ended = false; + rs.resume(); + rs.on('end', () => { + ended = true; + }); + await finishedPromise(rs); + assert(ended); + done(); + } + + run(); +} + +{ + const rs = fs.createReadStream('file-does-not-exist'); + + finished(rs, common.mustCall((err) => { + assert.strictEqual(err.code, 'ENOENT'); + })); +} + +{ + const rs = new Readable(); + + finished(rs, common.mustCall((err) => { + assert(!err, 'no error'); + })); + + rs.push(null); + rs.emit('close'); // should not trigger an error + rs.resume(); +} + +{ + const rs = new Readable(); + + finished(rs, common.mustCall((err) => { + assert(err, 'premature close error'); + })); + + rs.emit('close'); // should trigger error + rs.push(null); + rs.resume(); +} diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js new file mode 100644 index 00000000000000..e63ee2ed117679 --- /dev/null +++ b/test/parallel/test-stream-pipeline.js @@ -0,0 +1,483 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const { Stream, Writable, Readable, Transform, pipeline } = require('stream'); +const assert = require('assert'); +const http = require('http'); +const http2 = require('http2'); +const { promisify } = require('util'); + +common.crashOnUnhandledRejection(); + +{ + let finished = false; + const processed = []; + const expected = [ + Buffer.from('a'), + Buffer.from('b'), + Buffer.from('c') + ]; + + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + processed.push(data); + cb(); + } + }); + + write.on('finish', () => { + finished = true; + }); + + for (let i = 0; i < expected.length; i++) { + read.push(expected[i]); + } + read.push(null); + + pipeline(read, write, common.mustCall((err) => { + assert.ok(!err, 'no error'); + assert.ok(finished); + assert.deepStrictEqual(processed, expected); + })); +} + +{ + const read = new Readable({ + read() {} + }); + + assert.throws(() => { + pipeline(read, () => {}); + }, /ERR_MISSING_ARGS/); + assert.throws(() => { + pipeline(() => {}); + }, /ERR_MISSING_ARGS/); + assert.throws(() => { + pipeline(); + }, /ERR_MISSING_ARGS/); +} + +{ + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + setImmediate(() => read.destroy()); + + pipeline(read, write, common.mustCall((err) => { + assert.ok(err, 'should have an error'); + })); +} + +{ + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + setImmediate(() => read.destroy(new Error('kaboom'))); + + const dst = pipeline(read, write, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + })); + + assert.strictEqual(dst, write); +} + +{ + const read = new Readable({ + read() {} + }); + + const transform = new Transform({ + transform(data, enc, cb) { + cb(new Error('kaboom')); + } + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.on('close', common.mustCall()); + transform.on('close', common.mustCall()); + write.on('close', common.mustCall()); + + const dst = pipeline(read, transform, write, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + })); + + assert.strictEqual(dst, write); + + read.push('hello'); +} + +{ + const server = http.createServer((req, res) => { + const rs = new Readable({ + read() { + rs.push('hello'); + rs.push(null); + } + }); + + pipeline(rs, res); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.end(); + req.on('response', (res) => { + const buf = []; + res.on('data', (data) => buf.push(data)); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual( + Buffer.concat(buf), + Buffer.from('hello') + ); + server.close(); + })); + }); + }); +} + +{ + const server = http.createServer((req, res) => { + const rs = new Readable({ + read() { + rs.push('hello'); + }, + destroy: common.mustCall((err, cb) => { + // prevents fd leaks by destroying http pipelines + cb(); + }) + }); + + pipeline(rs, res); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.end(); + req.on('response', (res) => { + setImmediate(() => { + res.destroy(); + server.close(); + }); + }); + }); +} + +{ + const server = http.createServer((req, res) => { + const rs = new Readable({ + read() { + rs.push('hello'); + }, + destroy: common.mustCall((err, cb) => { + cb(); + }) + }); + + pipeline(rs, res); + }); + + let cnt = 10; + + const badSink = new Writable({ + write(data, enc, cb) { + cnt--; + if (cnt === 0) cb(new Error('kaboom')); + else cb(); + } + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.end(); + req.on('response', (res) => { + pipeline(res, badSink, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + server.close(); + })); + }); + }); +} + +{ + const server = http.createServer((req, res) => { + pipeline(req, res, common.mustCall()); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + const rs = new Readable({ + read() { + rs.push('hello'); + } + }); + + pipeline(rs, req, common.mustCall(() => { + server.close(); + })); + + req.on('response', (res) => { + let cnt = 10; + res.on('data', () => { + cnt--; + if (cnt === 0) rs.destroy(); + }); + }); + }); +} + +{ + const server = http2.createServer((req, res) => { + pipeline(req, res, common.mustCall()); + }); + + server.listen(0, () => { + const url = `http://localhost:${server.address().port}`; + const client = http2.connect(url); + const req = client.request({ ':method': 'POST' }); + + const rs = new Readable({ + read() { + rs.push('hello'); + } + }); + + pipeline(rs, req, common.mustCall((err) => { + // TODO: this is working around an http2 bug + // where the client keeps the event loop going + // (replacing the rs.destroy() with req.end() + // exits it so seems to be a destroy bug there + client.unref(); + + server.close(); + client.close(); + })); + + let cnt = 10; + req.on('data', (data) => { + cnt--; + if (cnt === 0) rs.destroy(); + }); + }); +} + +{ + const makeTransform = () => { + const tr = new Transform({ + transform(data, enc, cb) { + cb(null, data); + } + }); + + tr.on('close', common.mustCall()); + return tr; + }; + + const rs = new Readable({ + read() { + rs.push('hello'); + } + }); + + let cnt = 10; + + const ws = new Writable({ + write(data, enc, cb) { + cnt--; + if (cnt === 0) return cb(new Error('kaboom')); + cb(); + } + }); + + rs.on('close', common.mustCall()); + ws.on('close', common.mustCall()); + + pipeline( + rs, + makeTransform(), + makeTransform(), + makeTransform(), + makeTransform(), + makeTransform(), + makeTransform(), + ws, + common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + }) + ); +} + +{ + const oldStream = new Stream(); + + oldStream.pause = oldStream.resume = () => {}; + oldStream.write = (data) => { + oldStream.emit('data', data); + return true; + }; + oldStream.end = () => { + oldStream.emit('end'); + }; + + const expected = [ + Buffer.from('hello'), + Buffer.from('world') + ]; + + const rs = new Readable({ + read() { + for (let i = 0; i < expected.length; i++) { + rs.push(expected[i]); + } + rs.push(null); + } + }); + + const ws = new Writable({ + write(data, enc, cb) { + assert.deepStrictEqual(data, expected.shift()); + cb(); + } + }); + + let finished = false; + + ws.on('finish', () => { + finished = true; + }); + + pipeline( + rs, + oldStream, + ws, + common.mustCall((err) => { + assert(!err, 'no error'); + assert(finished, 'last stream finished'); + }) + ); +} + +{ + const oldStream = new Stream(); + + oldStream.pause = oldStream.resume = () => {}; + oldStream.write = (data) => { + oldStream.emit('data', data); + return true; + }; + oldStream.end = () => { + oldStream.emit('end'); + }; + + const destroyableOldStream = new Stream(); + + destroyableOldStream.pause = destroyableOldStream.resume = () => {}; + destroyableOldStream.destroy = common.mustCall(() => { + destroyableOldStream.emit('close'); + }); + destroyableOldStream.write = (data) => { + destroyableOldStream.emit('data', data); + return true; + }; + destroyableOldStream.end = () => { + destroyableOldStream.emit('end'); + }; + + const rs = new Readable({ + read() { + rs.destroy(new Error('stop')); + } + }); + + const ws = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + let finished = false; + + ws.on('finish', () => { + finished = true; + }); + + pipeline( + rs, + oldStream, + destroyableOldStream, + ws, + common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('stop')); + assert(!finished, 'should not finish'); + }) + ); +} + +{ + const pipelinePromise = promisify(pipeline); + + async function run() { + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + read.push(null); + + let finished = false; + + write.on('finish', () => { + finished = true; + }); + + await pipelinePromise(read, write); + + assert(finished); + } + + run(); +}