diff --git a/lib/internal/streams/pipe.js b/lib/internal/streams/pipe.js new file mode 100644 index 00000000000000..023dc0d3283a59 --- /dev/null +++ b/lib/internal/streams/pipe.js @@ -0,0 +1,57 @@ + +'use strict'; + +const pipeline = require('internal/streams/pipeline'); +const Duplex = require('internal/streams/duplex'); + +module.exports = function pipe(...streams) { + let cb; + let ret; + + const r = pipeline(streams, function(err) { + if (cb) { + cb(err); + } else { + ret.destroy(err); + } + }); + const w = streams[0]; + + ret = new Duplex({ + writable: !!w?.writable, + readable: !!r?.readable, + objectMode: streams[0].readableObjectMode, + highWaterMark: 1 + }); + + if (ret.writable) { + ret._write = function(chunk, encoding, callback) { + w.write(chunk, encoding, callback); + }; + + ret._final = function(chunk, encoding, callback) { + w.end(chunk, encoding, callback); + }; + } + + if (ret.readable) { + ret._read = function() { + r.resume(); + }; + + r + .on('data', function(buf) { + if (!ret.push(buf)) { + this.pause(); + } + }) + .on('end', function() { + ret.push(null); + }); + } + + ret._destroy = function(err, callback) { + cb = callback; + streams[0].destroy(err); + }; +}