Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: refactor to use more primordials #36346

Merged
merged 1 commit into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 10 additions & 6 deletions lib/internal/streams/buffer_list.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
'use strict';

const {
StringPrototypeSlice,
SymbolIterator,
TypedArrayPrototypeSet,
Uint8Array,
} = primordials;

Expand Down Expand Up @@ -67,7 +69,7 @@ module.exports = class BufferList {
let p = this.head;
let i = 0;
while (p) {
ret.set(p.data, i);
TypedArrayPrototypeSet(ret, p.data, i);
i += p.data.length;
p = p.next;
}
Expand Down Expand Up @@ -120,9 +122,9 @@ module.exports = class BufferList {
else
this.head = this.tail = null;
} else {
ret += str.slice(0, n);
ret += StringPrototypeSlice(str, 0, n);
this.head = p;
p.data = str.slice(n);
p.data = StringPrototypeSlice(str, n);
}
break;
}
Expand All @@ -141,18 +143,20 @@ module.exports = class BufferList {
do {
const buf = p.data;
if (n > buf.length) {
ret.set(buf, retLen - n);
TypedArrayPrototypeSet(ret, buf, retLen - n);
n -= buf.length;
} else {
if (n === buf.length) {
ret.set(buf, retLen - n);
TypedArrayPrototypeSet(ret, buf, retLen - n);
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
ret.set(new Uint8Array(buf.buffer, buf.byteOffset, n), retLen - n);
TypedArrayPrototypeSet(ret,
new Uint8Array(buf.buffer, buf.byteOffset, n),
retLen - n);
this.head = p;
p.data = buf.slice(n);
}
Expand Down
11 changes: 8 additions & 3 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
const {
ERR_MULTIPLE_CALLBACK
} = require('internal/errors').codes;
const { Symbol } = primordials;
const {
FunctionPrototypeCall,
Symbol,
} = primordials;

const kDestroy = Symbol('kDestroy');
const kConstruct = Symbol('kConstruct');
Expand Down Expand Up @@ -93,7 +96,8 @@ function _destroy(self, err, cb) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
FunctionPrototypeCall(
then,
result,
function() {
if (called)
Expand Down Expand Up @@ -311,7 +315,8 @@ function constructNT(stream) {
try {
const then = result.then;
if (typeof then === 'function') {
then.call(
FunctionPrototypeCall(
then,
result,
function() {
// If the callback was invoked, do nothing further.
Expand Down
20 changes: 13 additions & 7 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

'use strict';

const {
FunctionPrototype,
FunctionPrototypeCall,
} = primordials;
const {
ERR_STREAM_PREMATURE_CLOSE
} = require('internal/errors').codes;
Expand Down Expand Up @@ -53,7 +57,7 @@ function isWritableFinished(stream) {
return wState.finished || (wState.ended && wState.length === 0);
}

function nop() {}
const nop = FunctionPrototype;

function isReadableEnded(stream) {
if (stream.readableEnded) return true;
Expand Down Expand Up @@ -110,7 +114,7 @@ function eos(stream, options, callback) {
if (stream.destroyed) willEmitClose = false;

if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableEnded) callback.call(stream);
if (!readable || readableEnded) FunctionPrototypeCall(callback, stream);
};

let readableEnded = stream.readableEnded ||
Expand All @@ -123,23 +127,25 @@ function eos(stream, options, callback) {
if (stream.destroyed) willEmitClose = false;

if (willEmitClose && (!stream.writable || writable)) return;
if (!writable || writableFinished) callback.call(stream);
if (!writable || writableFinished) FunctionPrototypeCall(callback, stream);
};

const onerror = (err) => {
callback.call(stream, err);
FunctionPrototypeCall(callback, stream, err);
};

const onclose = () => {
if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
return FunctionPrototypeCall(callback, stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
return FunctionPrototypeCall(callback, stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
callback.call(stream);
FunctionPrototypeCall(callback, stream);
};

const onrequest = () => {
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/streams/from.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

const {
PromisePrototypeThen,
SymbolAsyncIterator,
SymbolIterator
} = primordials;
Expand Down Expand Up @@ -55,7 +56,8 @@ function from(Readable, iterable, opts) {
readable._destroy = function(error, cb) {
if (needToClose) {
needToClose = false;
close().then(
PromisePrototypeThen(
close(),
() => process.nextTick(cb, error),
(e) => process.nextTick(cb, error || e),
);
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/lazy_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
'use strict';

const {
FunctionPrototypeCall,
ObjectDefineProperties,
ObjectDefineProperty,
ObjectSetPrototypeOf,
Expand All @@ -25,7 +26,7 @@ ObjectSetPrototypeOf(LazyTransform, stream.Transform);

function makeGetter(name) {
return function() {
stream.Transform.call(this, this._options);
FunctionPrototypeCall(stream.Transform, this, this._options);
this._writableState.decodeStrings = false;

if (!this._options || !this._options.defaultEncoding) {
Expand Down
6 changes: 4 additions & 2 deletions lib/internal/streams/legacy.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

const {
ArrayIsArray,
ArrayPrototypeUnshift,
FunctionPrototypeCall,
ObjectSetPrototypeOf,
} = primordials;

const EE = require('events');

function Stream(opts) {
EE.call(this, opts);
FunctionPrototypeCall(EE, this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
Expand Down Expand Up @@ -106,7 +108,7 @@ function prependListener(emitter, event, fn) {
if (!emitter._events || !emitter._events[event])
emitter.on(event, fn);
else if (ArrayIsArray(emitter._events[event]))
emitter._events[event].unshift(fn);
ArrayPrototypeUnshift(emitter._events[event], fn);
else
emitter._events[event] = [fn, emitter._events[event]];
}
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/streams/passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
'use strict';

const {
FunctionPrototypeCall,
ObjectSetPrototypeOf,
} = primordials;

Expand All @@ -39,7 +40,7 @@ function PassThrough(options) {
if (!(this instanceof PassThrough))
return new PassThrough(options);

Transform.call(this, options);
FunctionPrototypeCall(Transform, this, options);
}

PassThrough.prototype._transform = function(chunk, encoding, cb) {
Expand Down
14 changes: 9 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

const {
ArrayIsArray,
ArrayPrototypePop,
ArrayPrototypePush,
ArrayPrototypeShift,
FunctionPrototypeCall,
ReflectApply,
SymbolAsyncIterator,
SymbolIterator,
Expand Down Expand Up @@ -75,7 +79,7 @@ function popCallback(streams) {
// a single stream. Therefore optimize for the average case instead of
// checking for length === 0 as well.
validateCallback(streams[streams.length - 1]);
return streams.pop();
return ArrayPrototypePop(streams);
}

function isReadable(obj) {
Expand Down Expand Up @@ -114,7 +118,7 @@ async function* fromReadable(val) {
Readable = require('internal/streams/readable');
}

yield* Readable.prototype[SymbolAsyncIterator].call(val);
yield* FunctionPrototypeCall(Readable.prototype[SymbolAsyncIterator], val);
}

async function pump(iterable, writable, finish) {
Expand Down Expand Up @@ -171,7 +175,7 @@ function pipeline(...streams) {
}

while (destroys.length) {
destroys.shift()(error);
ArrayPrototypeShift(destroys)(error);
}

if (final) {
Expand All @@ -187,7 +191,7 @@ function pipeline(...streams) {

if (isStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
ArrayPrototypePush(destroys, destroyer(stream, reading, writing, finish));
}

if (i === 0) {
Expand Down Expand Up @@ -250,7 +254,7 @@ function pipeline(...streams) {
ret = pt;

finishCount++;
destroys.push(destroyer(ret, false, true, finish));
ArrayPrototypePush(destroys, destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
Expand Down