diff --git a/lib/internal/streams/buffer_list.js b/lib/internal/streams/buffer_list.js index 2551bee4473de4..2dc803d6fa0425 100644 --- a/lib/internal/streams/buffer_list.js +++ b/lib/internal/streams/buffer_list.js @@ -1,7 +1,9 @@ 'use strict'; const { + StringPrototypeSlice, SymbolIterator, + TypedArrayPrototypeSet, Uint8Array, } = primordials; @@ -67,7 +69,7 @@ module.exports = class BufferList { let p = this.head; let i = 0; while (p) { - ret.set(p.data, i); + TypedArrayPrototypeSet(ret, p.data, i); i += p.data.length; p = p.next; } @@ -120,9 +122,9 @@ module.exports = class BufferList { else this.head = this.tail = null; } else { - ret += str.slice(0, n); + ret += StringPrototypeSlice(str, 0, n); this.head = p; - p.data = str.slice(n); + p.data = StringPrototypeSlice(str, n); } break; } @@ -141,18 +143,20 @@ module.exports = class BufferList { do { const buf = p.data; if (n > buf.length) { - ret.set(buf, retLen - n); + TypedArrayPrototypeSet(ret, buf, retLen - n); n -= buf.length; } else { if (n === buf.length) { - ret.set(buf, retLen - n); + TypedArrayPrototypeSet(ret, buf, retLen - n); ++c; if (p.next) this.head = p.next; else this.head = this.tail = null; } else { - ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n); + TypedArrayPrototypeSet(ret, + new Uint8Array(buf.buffer, buf.byteOffset, n), + retLen - n); this.head = p; p.data = buf.slice(n); } diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index ff1bea5a415577..76df2aa0a34666 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -3,7 +3,10 @@ const { ERR_MULTIPLE_CALLBACK } = require('internal/errors').codes; -const { Symbol } = primordials; +const { + FunctionPrototypeCall, + Symbol, +} = primordials; const kDestroy = Symbol('kDestroy'); const kConstruct = Symbol('kConstruct'); @@ -93,7 +96,8 @@ function _destroy(self, err, cb) { try { const then = result.then; if (typeof then === 'function') { - then.call( + FunctionPrototypeCall( + then, result, function() { if (called) @@ -311,7 +315,8 @@ function constructNT(stream) { try { const then = result.then; if (typeof then === 'function') { - then.call( + FunctionPrototypeCall( + then, result, function() { // If the callback was invoked, do nothing further. diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 5bf850ea6f8b92..2f0043bf0b751e 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -3,6 +3,10 @@ 'use strict'; +const { + FunctionPrototype, + FunctionPrototypeCall, +} = primordials; const { ERR_STREAM_PREMATURE_CLOSE } = require('internal/errors').codes; @@ -53,7 +57,7 @@ function isWritableFinished(stream) { return wState.finished || (wState.ended && wState.length === 0); } -function nop() {} +const nop = FunctionPrototype; function isReadableEnded(stream) { if (stream.readableEnded) return true; @@ -110,7 +114,7 @@ function eos(stream, options, callback) { if (stream.destroyed) willEmitClose = false; if (willEmitClose && (!stream.readable || readable)) return; - if (!readable || readableEnded) callback.call(stream); + if (!readable || readableEnded) FunctionPrototypeCall(callback, stream); }; let readableEnded = stream.readableEnded || @@ -123,23 +127,25 @@ function eos(stream, options, callback) { if (stream.destroyed) willEmitClose = false; if (willEmitClose && (!stream.writable || writable)) return; - if (!writable || writableFinished) callback.call(stream); + if (!writable || writableFinished) FunctionPrototypeCall(callback, stream); }; const onerror = (err) => { - callback.call(stream, err); + FunctionPrototypeCall(callback, stream, err); }; const onclose = () => { if (readable && !readableEnded) { if (!isReadableEnded(stream)) - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + return FunctionPrototypeCall(callback, stream, + new ERR_STREAM_PREMATURE_CLOSE()); } if (writable && !writableFinished) { if (!isWritableFinished(stream)) - return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + return FunctionPrototypeCall(callback, stream, + new ERR_STREAM_PREMATURE_CLOSE()); } - callback.call(stream); + FunctionPrototypeCall(callback, stream); }; const onrequest = () => { diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index 949d0ceb6dae50..c4e06945260d5f 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -1,6 +1,7 @@ 'use strict'; const { + PromisePrototypeThen, SymbolAsyncIterator, SymbolIterator } = primordials; @@ -55,7 +56,8 @@ function from(Readable, iterable, opts) { readable._destroy = function(error, cb) { if (needToClose) { needToClose = false; - close().then( + PromisePrototypeThen( + close(), () => process.nextTick(cb, error), (e) => process.nextTick(cb, error || e), ); diff --git a/lib/internal/streams/lazy_transform.js b/lib/internal/streams/lazy_transform.js index 555e6430e33588..ad072e3474b3e1 100644 --- a/lib/internal/streams/lazy_transform.js +++ b/lib/internal/streams/lazy_transform.js @@ -4,6 +4,7 @@ 'use strict'; const { + FunctionPrototypeCall, ObjectDefineProperties, ObjectDefineProperty, ObjectSetPrototypeOf, @@ -25,7 +26,7 @@ ObjectSetPrototypeOf(LazyTransform, stream.Transform); function makeGetter(name) { return function() { - stream.Transform.call(this, this._options); + FunctionPrototypeCall(stream.Transform, this, this._options); this._writableState.decodeStrings = false; if (!this._options || !this._options.defaultEncoding) { diff --git a/lib/internal/streams/legacy.js b/lib/internal/streams/legacy.js index 0a0d0571c46378..d08df00259033b 100644 --- a/lib/internal/streams/legacy.js +++ b/lib/internal/streams/legacy.js @@ -2,13 +2,15 @@ const { ArrayIsArray, + ArrayPrototypeUnshift, + FunctionPrototypeCall, ObjectSetPrototypeOf, } = primordials; const EE = require('events'); function Stream(opts) { - EE.call(this, opts); + FunctionPrototypeCall(EE, this, opts); } ObjectSetPrototypeOf(Stream.prototype, EE.prototype); ObjectSetPrototypeOf(Stream, EE); @@ -106,7 +108,7 @@ function prependListener(emitter, event, fn) { if (!emitter._events || !emitter._events[event]) emitter.on(event, fn); else if (ArrayIsArray(emitter._events[event])) - emitter._events[event].unshift(fn); + ArrayPrototypeUnshift(emitter._events[event], fn); else emitter._events[event] = [fn, emitter._events[event]]; } diff --git a/lib/internal/streams/passthrough.js b/lib/internal/streams/passthrough.js index d37f9caf0116b5..acc1a148a7d7c1 100644 --- a/lib/internal/streams/passthrough.js +++ b/lib/internal/streams/passthrough.js @@ -26,6 +26,7 @@ 'use strict'; const { + FunctionPrototypeCall, ObjectSetPrototypeOf, } = primordials; @@ -39,7 +40,7 @@ function PassThrough(options) { if (!(this instanceof PassThrough)) return new PassThrough(options); - Transform.call(this, options); + FunctionPrototypeCall(Transform, this, options); } PassThrough.prototype._transform = function(chunk, encoding, cb) { diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 900e561d32943c..876195b10c124a 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -5,6 +5,10 @@ const { ArrayIsArray, + ArrayPrototypePop, + ArrayPrototypePush, + ArrayPrototypeShift, + FunctionPrototypeCall, ReflectApply, SymbolAsyncIterator, SymbolIterator, @@ -75,7 +79,7 @@ function popCallback(streams) { // a single stream. Therefore optimize for the average case instead of // checking for length === 0 as well. validateCallback(streams[streams.length - 1]); - return streams.pop(); + return ArrayPrototypePop(streams); } function isReadable(obj) { @@ -114,7 +118,7 @@ async function* fromReadable(val) { Readable = require('internal/streams/readable'); } - yield* Readable.prototype[SymbolAsyncIterator].call(val); + yield* FunctionPrototypeCall(Readable.prototype[SymbolAsyncIterator], val); } async function pump(iterable, writable, finish) { @@ -171,7 +175,7 @@ function pipeline(...streams) { } while (destroys.length) { - destroys.shift()(error); + ArrayPrototypeShift(destroys)(error); } if (final) { @@ -187,7 +191,7 @@ function pipeline(...streams) { if (isStream(stream)) { finishCount++; - destroys.push(destroyer(stream, reading, writing, finish)); + ArrayPrototypePush(destroys, destroyer(stream, reading, writing, finish)); } if (i === 0) { @@ -250,7 +254,7 @@ function pipeline(...streams) { ret = pt; finishCount++; - destroys.push(destroyer(ret, false, true, finish)); + ArrayPrototypePush(destroys, destroyer(ret, false, true, finish)); } } else if (isStream(stream)) { if (isReadable(ret)) { diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index d0424edc1fa32b..3479dd970b5543 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -22,6 +22,12 @@ 'use strict'; const { + ArrayPrototypeIndexOf, + ArrayPrototypePush, + ArrayPrototypeSplice, + FunctionPrototype, + FunctionPrototypeBind, + FunctionPrototypeCall, NumberIsInteger, NumberIsNaN, NumberParseInt, @@ -29,7 +35,8 @@ const { ObjectKeys, ObjectSetPrototypeOf, Promise, - Set, + ReflectApply, + SafeSet, SymbolAsyncIterator, Symbol } = primordials; @@ -70,7 +77,7 @@ let from; ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); -function nop() {} +const nop = FunctionPrototype; const { errorOrDestroy } = destroyImpl; @@ -200,7 +207,7 @@ function Readable(options) { addAbortSignalNoValidate(options.signal, this); } - Stream.call(this, options); + FunctionPrototypeCall(Stream, this, options); destroyImpl.construct(this, () => { if (this._readableState.needReadable) { @@ -654,13 +661,13 @@ Readable.prototype.pipe = function(dest, pipeOpts) { if (state.pipes.length === 1) { if (!state.multiAwaitDrain) { state.multiAwaitDrain = true; - state.awaitDrainWriters = new Set( + state.awaitDrainWriters = new SafeSet( state.awaitDrainWriters ? [state.awaitDrainWriters] : [] ); } } - state.pipes.push(dest); + ArrayPrototypePush(state.pipes, dest); debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); const doEnd = (!pipeOpts || pipeOpts.end !== false) && @@ -853,11 +860,11 @@ Readable.prototype.unpipe = function(dest) { } // Try to find the right one. - const index = state.pipes.indexOf(dest); + const index = ArrayPrototypeIndexOf(state.pipes, dest); if (index === -1) return this; - state.pipes.splice(index, 1); + ArrayPrototypeSplice(state.pipes, index, 1); if (state.pipes.length === 0) this.pause(); @@ -869,7 +876,7 @@ Readable.prototype.unpipe = function(dest) { // Set up data events if they are asked for // Ensure readable listeners eventually get something. Readable.prototype.on = function(ev, fn) { - const res = Stream.prototype.on.call(this, ev, fn); + const res = FunctionPrototypeCall(Stream.prototype.on, this, ev, fn); const state = this._readableState; if (ev === 'data') { @@ -899,7 +906,8 @@ Readable.prototype.on = function(ev, fn) { Readable.prototype.addListener = Readable.prototype.on; Readable.prototype.removeListener = function(ev, fn) { - const res = Stream.prototype.removeListener.call(this, ev, fn); + const res = FunctionPrototypeCall(Stream.prototype.removeListener, this, + ev, fn); if (ev === 'readable') { // We need to check if there is someone still listening to @@ -916,7 +924,8 @@ Readable.prototype.removeListener = function(ev, fn) { Readable.prototype.off = Readable.prototype.removeListener; Readable.prototype.removeAllListeners = function(ev) { - const res = Stream.prototype.removeAllListeners.apply(this, arguments); + const res = ReflectApply(Stream.prototype.removeAllListeners, this, + arguments); if (ev === 'readable' || ev === undefined) { // We need to check if there is someone still listening to @@ -1049,7 +1058,7 @@ Readable.prototype.wrap = function(stream) { // Proxy all the other methods. Important when wrapping filters and duplexes. for (const i of ObjectKeys(stream)) { if (this[i] === undefined && typeof stream[i] === 'function') { - this[i] = stream[i].bind(stream); + this[i] = FunctionPrototypeBind(stream[i], stream); } } @@ -1100,15 +1109,15 @@ async function* createAsyncIterator(stream) { .on('error', function(err) { error = err; errorEmitted = true; - next.call(this); + FunctionPrototypeCall(next, this); }) .on('end', function() { endEmitted = true; - next.call(this); + FunctionPrototypeCall(next, this); }) .on('close', function() { closeEmitted = true; - next.call(this); + FunctionPrototypeCall(next, this); }); try { diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 26e0b07c2956c8..971bf5126f250d 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -64,6 +64,7 @@ 'use strict'; const { + FunctionPrototypeCall, ObjectSetPrototypeOf, Symbol } = primordials; @@ -132,7 +133,8 @@ function final(cb) { try { const then = result.then; if (typeof then === 'function') { - then.call( + FunctionPrototypeCall( + then, result, (data) => { if (called) @@ -165,7 +167,7 @@ function final(cb) { function prefinish() { if (this._final !== final) { - final.call(this); + FunctionPrototypeCall(final, this); } } @@ -207,7 +209,8 @@ Transform.prototype._write = function(chunk, encoding, callback) { try { const then = result.then; if (typeof then === 'function') { - then.call( + FunctionPrototypeCall( + then, result, (val) => { if (called) diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 2510398d999fe1..2324dc579dc514 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -26,11 +26,16 @@ 'use strict'; const { - FunctionPrototype, + ArrayPrototypePush, + ArrayPrototypeSlice, + ArrayPrototypeSplice, Error, + FunctionPrototypeCall, + FunctionPrototypeSymbolHasInstance, ObjectDefineProperty, ObjectDefineProperties, ObjectSetPrototypeOf, + StringPrototypeToLowerCase, Symbol, SymbolHasInstance, } = primordials; @@ -206,7 +211,7 @@ function resetBuffer(state) { } WritableState.prototype.getBuffer = function getBuffer() { - return this.buffered.slice(this.bufferedIndex); + return ArrayPrototypeSlice(this.buffered, this.bufferedIndex); }; ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { @@ -215,27 +220,6 @@ ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { } }); -// Test _writableState for inheritance to account for Duplex streams, -// whose prototype chain only points to Readable. -let realHasInstance; -if (typeof Symbol === 'function' && SymbolHasInstance) { - realHasInstance = FunctionPrototype[SymbolHasInstance]; - ObjectDefineProperty(Writable, SymbolHasInstance, { - value: function(object) { - if (realHasInstance.call(this, object)) - return true; - if (this !== Writable) - return false; - - return object && object._writableState instanceof WritableState; - } - }); -} else { - realHasInstance = function(object) { - return object instanceof this; - }; -} - function Writable(options) { // Writable ctor is applied to Duplexes, too. // `realHasInstance` is necessary because using plain `instanceof` @@ -249,7 +233,7 @@ function Writable(options) { // the WritableState constructor, at least with V8 6.5. const isDuplex = (this instanceof Stream.Duplex); - if (!isDuplex && !realHasInstance.call(Writable, this)) + if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options); this._writableState = new WritableState(options, this, isDuplex); @@ -273,7 +257,7 @@ function Writable(options) { addAbortSignalNoValidate(options.signal, this); } - Stream.call(this, options); + FunctionPrototypeCall(Stream, this, options); destroyImpl.construct(this, () => { const state = this._writableState; @@ -286,6 +270,15 @@ function Writable(options) { }); } +ObjectDefineProperty(Writable, SymbolHasInstance, { + value: function(object) { + if (FunctionPrototypeSymbolHasInstance(this, object)) return true; + if (this !== Writable) return false; + + return object && object._writableState instanceof WritableState; + }, +}); + // Otherwise people can pipe Writable streams, which is just wrong. Writable.prototype.pipe = function() { errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); @@ -363,7 +356,7 @@ Writable.prototype.uncork = function() { Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { // node::ParseEncoding() requires lower case. if (typeof encoding === 'string') - encoding = encoding.toLowerCase(); + encoding = StringPrototypeToLowerCase(encoding); if (!Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding); this._writableState.defaultEncoding = encoding; @@ -528,7 +521,7 @@ function errorBuffer(state) { callback(new ERR_STREAM_DESTROYED('write')); } - for (const callback of state[kOnFinished].splice(0)) { + for (const callback of ArrayPrototypeSplice(state[kOnFinished], 0)) { callback(new ERR_STREAM_DESTROYED('end')); } @@ -564,7 +557,8 @@ function clearBuffer(stream, state) { }; // Make a copy of `buffered` if it's going to be used by `callback` above, // since `doWrite` will mutate the array. - const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); + const chunks = state.allNoop && i === 0 ? + buffered : ArrayPrototypeSlice(buffered, i); chunks.allBuffers = state.allBuffers; doWrite(stream, state, true, state.length, chunks, '', callback); @@ -581,7 +575,7 @@ function clearBuffer(stream, state) { if (i === buffered.length) { resetBuffer(state); } else if (i > 256) { - buffered.splice(0, i); + ArrayPrototypeSplice(buffered, 0, i); state.bufferedIndex = 0; } else { state.bufferedIndex = i; @@ -649,7 +643,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { if (err || state.finished) { process.nextTick(cb, err); } else { - state[kOnFinished].push(cb); + ArrayPrototypePush(state[kOnFinished], cb); } } @@ -672,7 +666,7 @@ function callFinal(stream, state) { const result = stream._final((err) => { state.pendingcb--; if (err) { - for (const callback of state[kOnFinished].splice(0)) { + for (const callback of ArrayPrototypeSplice(state[kOnFinished], 0)) { callback(err); } errorOrDestroy(stream, err, state.sync); @@ -690,7 +684,8 @@ function callFinal(stream, state) { try { const then = result.then; if (typeof then === 'function') { - then.call( + FunctionPrototypeCall( + then, result, function() { if (state.prefinished) @@ -701,8 +696,8 @@ function callFinal(stream, state) { process.nextTick(finish, stream, state); }, function(err) { - for (const callback of state[kOnFinished].splice(0)) { - process.nextTick(callback, err); + for (const cb of ArrayPrototypeSplice(state[kOnFinished], 0)) { + process.nextTick(cb, err); } process.nextTick(errorOrDestroy, stream, err, state.sync); }); @@ -748,7 +743,7 @@ function finish(stream, state) { state.finished = true; - for (const callback of state[kOnFinished].splice(0)) { + for (const callback of ArrayPrototypeSplice(state[kOnFinished], 0)) { callback(); } @@ -866,7 +861,7 @@ Writable.prototype.destroy = function(err, cb) { process.nextTick(errorBuffer, state); } - destroy.call(this, err, cb); + FunctionPrototypeCall(destroy, this, err, cb); return this; }; diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js index 90d418a09783e3..35b86f67f99676 100644 --- a/test/parallel/test-stream-pipe-await-drain.js +++ b/test/parallel/test-stream-pipe-await-drain.js @@ -27,7 +27,7 @@ writer1.once('chunk-received', () => { reader._readableState.awaitDrainWriters.size, 0, 'awaitDrain initial value should be 0, actual is ' + - reader._readableState.awaitDrainWriters + reader._readableState.awaitDrainWriters.size ); setImmediate(() => { // This one should *not* get through to writer1 because writer2 is not