diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index 912f9d7793eb9b..65a36e17710935 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -8,7 +8,6 @@ const { isReadable, isWritable, } = require('internal/streams/utils'); -const duplexify = require('internal/streams/duplexify'); const { AbortError, codes: { @@ -23,18 +22,18 @@ module.exports = function compose(...streams) { } if (streams.length === 1) { - return duplexify(streams[0], 'streams[0]'); + return Duplex.from(streams[0]); } const orgStreams = [...streams]; if (typeof streams[0] === 'function') { - streams[0] = duplexify(streams[0], 'streams[0]'); + streams[0] = Duplex.from(streams[0]); } if (typeof streams[streams.length - 1] === 'function') { const idx = streams.length - 1; - streams[idx] = duplexify(streams[idx], `streams[${idx}]`); + streams[idx] = Duplex.from(streams[idx]); } for (let n = 0; n < streams.length; ++n) { @@ -87,7 +86,7 @@ module.exports = function compose(...streams) { // Implement Writable/Readable/Duplex traits. // See, https://github.com/nodejs/node/pull/33515. d = new Duplex({ - highWaterMark: 1, + // TODO (ronag): highWaterMark? writableObjectMode: !!head?.writableObjectMode, readableObjectMode: !!tail?.writableObjectMode, writable, diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js index 0c6e74b9c70719..0225096a445672 100644 --- a/lib/internal/streams/duplexify.js +++ b/lib/internal/streams/duplexify.js @@ -197,6 +197,9 @@ function _duplexify(pair) { onfinished(err); }); + // TODO(ronag): Avoid double buffering. + // Implement Writable/Readable/Duplex traits. + // See, https://github.com/nodejs/node/pull/33515. d = new Duplex({ // TODO (ronag): highWaterMark? readableObjectMode: !!r?.readableObjectMode, diff --git a/lib/stream.js b/lib/stream.js index dec9cec695076c..396dcf6b18d315 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -34,7 +34,8 @@ const compose = require('internal/streams/compose'); const { destroyer } = require('internal/streams/destroy'); const eos = require('internal/streams/end-of-stream'); const internalBuffer = require('internal/buffer'); -const duplexify = require('internal/streams/duplexify'); + +let duplexify; const promises = require('stream/promises'); @@ -43,6 +44,9 @@ Stream.Readable = require('internal/streams/readable'); Stream.Writable = require('internal/streams/writable'); Stream.Duplex = require('internal/streams/duplex'); Stream.Duplex.from = function from(body) { + if (!duplexify) { + duplexify = require('internal/streams/duplexify'); + } return duplexify(body, 'body'); }; Stream.Transform = require('internal/streams/transform'); diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 1528a808a6dd03..f83506a183cfa4 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -101,10 +101,8 @@ const expectedModules = new Set([ 'NativeModule internal/stream_base_commons', 'NativeModule internal/streams/add-abort-signal', 'NativeModule internal/streams/buffer_list', - 'NativeModule internal/streams/compose', 'NativeModule internal/streams/destroy', 'NativeModule internal/streams/duplex', - 'NativeModule internal/streams/duplexify', 'NativeModule internal/streams/end-of-stream', 'NativeModule internal/streams/from', 'NativeModule internal/streams/legacy',